1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131 pub body: serde_json::Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub headers: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146 pub query_params: std::collections::HashMap<String, String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub body_pattern: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub json_path: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub xpath: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub custom_matcher: Option<String>,
159}
160
161pub fn mock_matches_request(
170 mock: &MockConfig,
171 method: &str,
172 path: &str,
173 headers: &std::collections::HashMap<String, String>,
174 query_params: &std::collections::HashMap<String, String>,
175 body: Option<&[u8]>,
176) -> bool {
177 use regex::Regex;
178
179 if !mock.enabled {
181 return false;
182 }
183
184 if mock.method.to_uppercase() != method.to_uppercase() {
186 return false;
187 }
188
189 if !path_matches_pattern(&mock.path, path) {
191 return false;
192 }
193
194 if let Some(criteria) = &mock.request_match {
196 for (key, expected_value) in &criteria.headers {
198 let header_key_lower = key.to_lowercase();
199 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201 if let Some((_, actual_value)) = found {
202 if let Ok(re) = Regex::new(expected_value) {
204 if !re.is_match(actual_value) {
205 return false;
206 }
207 } else if actual_value != expected_value {
208 return false;
209 }
210 } else {
211 return false; }
213 }
214
215 for (key, expected_value) in &criteria.query_params {
217 if let Some(actual_value) = query_params.get(key) {
218 if actual_value != expected_value {
219 return false;
220 }
221 } else {
222 return false; }
224 }
225
226 if let Some(pattern) = &criteria.body_pattern {
228 if let Some(body_bytes) = body {
229 let body_str = String::from_utf8_lossy(body_bytes);
230 if let Ok(re) = Regex::new(pattern) {
232 if !re.is_match(&body_str) {
233 return false;
234 }
235 } else if body_str.as_ref() != pattern {
236 return false;
237 }
238 } else {
239 return false; }
241 }
242
243 if let Some(json_path) = &criteria.json_path {
245 if let Some(body_bytes) = body {
246 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248 if !json_path_exists(&json_value, json_path) {
250 return false;
251 }
252 }
253 }
254 }
255 }
256
257 if let Some(xpath) = &criteria.xpath {
259 if let Some(body_bytes) = body {
260 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261 if !xml_xpath_exists(body_str, xpath) {
262 return false;
263 }
264 } else {
265 return false;
266 }
267 } else {
268 return false; }
270 }
271
272 if let Some(custom) = &criteria.custom_matcher {
274 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275 return false;
276 }
277 }
278 }
279
280 true
281}
282
283fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285 if pattern == path {
287 return true;
288 }
289
290 if pattern == "*" {
292 return true;
293 }
294
295 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299 if pattern_parts.len() != path_parts.len() {
300 if pattern.contains('*') {
302 return matches_wildcard_pattern(pattern, path);
303 }
304 return false;
305 }
306
307 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310 continue; }
312
313 if pattern_part != path_part {
314 return false;
315 }
316 }
317
318 true
319}
320
321fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323 use regex::Regex;
324
325 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329 return re.is_match(path);
330 }
331
332 false
333}
334
335fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343 let path = if json_path == "$" {
344 return true;
345 } else if let Some(p) = json_path.strip_prefix("$.") {
346 p
347 } else if let Some(p) = json_path.strip_prefix('$') {
348 p.strip_prefix('.').unwrap_or(p)
349 } else {
350 json_path
351 };
352
353 let mut current = json;
354 for segment in split_json_path_segments(path) {
355 match segment {
356 JsonPathSegment::Field(name) => {
357 if let Some(obj) = current.as_object() {
358 if let Some(value) = obj.get(name) {
359 current = value;
360 } else {
361 return false;
362 }
363 } else {
364 return false;
365 }
366 }
367 JsonPathSegment::Index(idx) => {
368 if let Some(arr) = current.as_array() {
369 if let Some(value) = arr.get(idx) {
370 current = value;
371 } else {
372 return false;
373 }
374 } else {
375 return false;
376 }
377 }
378 JsonPathSegment::Wildcard => {
379 if let Some(arr) = current.as_array() {
380 return !arr.is_empty();
381 }
382 return false;
383 }
384 }
385 }
386 true
387}
388
389enum JsonPathSegment<'a> {
390 Field(&'a str),
391 Index(usize),
392 Wildcard,
393}
394
395fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397 let mut segments = Vec::new();
398 for part in path.split('.') {
399 if part.is_empty() {
400 continue;
401 }
402 if let Some(bracket_start) = part.find('[') {
403 let field_name = &part[..bracket_start];
404 if !field_name.is_empty() {
405 segments.push(JsonPathSegment::Field(field_name));
406 }
407 let bracket_content = &part[bracket_start + 1..part.len() - 1];
408 if bracket_content == "*" {
409 segments.push(JsonPathSegment::Wildcard);
410 } else if let Ok(idx) = bracket_content.parse::<usize>() {
411 segments.push(JsonPathSegment::Index(idx));
412 }
413 } else {
414 segments.push(JsonPathSegment::Field(part));
415 }
416 }
417 segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422 name: String,
423 text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427 if segment.is_empty() {
428 return None;
429 }
430
431 let trimmed = segment.trim();
432 if let Some(bracket_start) = trimmed.find('[') {
433 if !trimmed.ends_with(']') {
434 return None;
435 }
436
437 let name = trimmed[..bracket_start].trim();
438 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439 let predicate = predicate.trim();
440
441 if let Some(raw) = predicate.strip_prefix("text()=") {
443 let raw = raw.trim();
444 if raw.len() >= 2
445 && ((raw.starts_with('"') && raw.ends_with('"'))
446 || (raw.starts_with('\'') && raw.ends_with('\'')))
447 {
448 let text = raw[1..raw.len() - 1].to_string();
449 if !name.is_empty() {
450 return Some(XPathSegment {
451 name: name.to_string(),
452 text_equals: Some(text),
453 });
454 }
455 }
456 }
457
458 None
459 } else {
460 Some(XPathSegment {
461 name: trimmed.to_string(),
462 text_equals: None,
463 })
464 }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468 if !node.is_element() {
469 return false;
470 }
471 if node.tag_name().name() != segment.name {
472 return false;
473 }
474 match &segment.text_equals {
475 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476 None => true,
477 }
478}
479
480fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487 let doc = match roxmltree::Document::parse(xml_body) {
488 Ok(doc) => doc,
489 Err(err) => {
490 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491 return false;
492 }
493 };
494
495 let expr = xpath.trim();
496 if expr.is_empty() {
497 return false;
498 }
499
500 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501 (true, rest)
502 } else if let Some(rest) = expr.strip_prefix('/') {
503 (false, rest)
504 } else {
505 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506 return false;
507 };
508
509 let segments: Vec<XPathSegment> = path_str
510 .split('/')
511 .filter(|s| !s.trim().is_empty())
512 .filter_map(parse_xpath_segment)
513 .collect();
514
515 if segments.is_empty() {
516 return false;
517 }
518
519 if is_descendant {
520 let first = &segments[0];
521 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522 let mut frontier = vec![node];
523 for segment in &segments[1..] {
524 let mut next_frontier = Vec::new();
525 for parent in &frontier {
526 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527 next_frontier.push(child);
528 }
529 }
530 if next_frontier.is_empty() {
531 frontier.clear();
532 break;
533 }
534 frontier = next_frontier;
535 }
536 if !frontier.is_empty() {
537 return true;
538 }
539 }
540 false
541 } else {
542 let mut frontier = vec![doc.root_element()];
543 for (index, segment) in segments.iter().enumerate() {
544 let mut next_frontier = Vec::new();
545 for parent in &frontier {
546 if index == 0 {
547 if segment_matches(*parent, segment) {
548 next_frontier.push(*parent);
549 }
550 continue;
551 }
552 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553 next_frontier.push(child);
554 }
555 }
556 if next_frontier.is_empty() {
557 return false;
558 }
559 frontier = next_frontier;
560 }
561 !frontier.is_empty()
562 }
563}
564
565fn evaluate_custom_matcher(
567 expression: &str,
568 method: &str,
569 path: &str,
570 headers: &std::collections::HashMap<String, String>,
571 query_params: &std::collections::HashMap<String, String>,
572 body: Option<&[u8]>,
573) -> bool {
574 use regex::Regex;
575
576 let expr = expression.trim();
577
578 if expr.contains("==") {
580 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581 if parts.len() != 2 {
582 return false;
583 }
584
585 let field = parts[0];
586 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588 match field {
589 "method" => method == expected_value,
590 "path" => path == expected_value,
591 _ if field.starts_with("headers.") => {
592 let header_name = &field[8..];
593 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594 }
595 _ if field.starts_with("query.") => {
596 let param_name = &field[6..];
597 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598 }
599 _ => false,
600 }
601 }
602 else if expr.contains("=~") {
604 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605 if parts.len() != 2 {
606 return false;
607 }
608
609 let field = parts[0];
610 let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612 if let Ok(re) = Regex::new(pattern) {
613 match field {
614 "method" => re.is_match(method),
615 "path" => re.is_match(path),
616 _ if field.starts_with("headers.") => {
617 let header_name = &field[8..];
618 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619 }
620 _ if field.starts_with("query.") => {
621 let param_name = &field[6..];
622 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623 }
624 _ => false,
625 }
626 } else {
627 false
628 }
629 }
630 else if expr.contains("contains") {
632 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633 if parts.len() != 2 {
634 return false;
635 }
636
637 let field = parts[0];
638 let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640 match field {
641 "path" => path.contains(search_value),
642 _ if field.starts_with("headers.") => {
643 let header_name = &field[8..];
644 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645 }
646 _ if field.starts_with("body") => {
647 if let Some(body_bytes) = body {
648 let body_str = String::from_utf8_lossy(body_bytes);
649 body_str.contains(search_value)
650 } else {
651 false
652 }
653 }
654 _ => false,
655 }
656 } else {
657 tracing::warn!("Unknown custom matcher expression format: {}", expr);
659 false
660 }
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666 pub uptime_seconds: u64,
668 pub total_requests: u64,
670 pub active_mocks: usize,
672 pub enabled_mocks: usize,
674 pub registered_routes: usize,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681 pub version: String,
683 pub port: u16,
685 pub has_openapi_spec: bool,
687 #[serde(skip_serializing_if = "Option::is_none")]
689 pub spec_path: Option<String>,
690}
691
692#[derive(Clone)]
694pub struct ManagementState {
695 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697 pub spec: Option<Arc<OpenApiSpec>>,
699 pub spec_path: Option<String>,
701 pub port: u16,
703 pub start_time: std::time::Instant,
705 pub request_counter: Arc<RwLock<u64>>,
707 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709 #[cfg(feature = "smtp")]
711 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712 #[cfg(feature = "mqtt")]
714 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715 #[cfg(feature = "kafka")]
717 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718 #[cfg(any(feature = "mqtt", feature = "kafka"))]
720 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721 pub state_machine_manager:
723 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728 pub rule_explanations: Arc<
730 RwLock<
731 std::collections::HashMap<
732 String,
733 mockforge_core::intelligent_behavior::RuleExplanation,
734 >,
735 >,
736 >,
737 #[cfg(feature = "chaos")]
739 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742 #[cfg(feature = "conformance")]
744 pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755 Self {
756 mocks: Arc::new(RwLock::new(Vec::new())),
757 spec,
758 spec_path,
759 port,
760 start_time: std::time::Instant::now(),
761 request_counter: Arc::new(RwLock::new(0)),
762 proxy_config: None,
763 #[cfg(feature = "smtp")]
764 smtp_registry: None,
765 #[cfg(feature = "mqtt")]
766 mqtt_broker: None,
767 #[cfg(feature = "kafka")]
768 kafka_broker: None,
769 #[cfg(any(feature = "mqtt", feature = "kafka"))]
770 message_events: {
771 let capacity = get_message_broadcast_capacity();
772 let (tx, _) = broadcast::channel(capacity);
773 Arc::new(tx)
774 },
775 state_machine_manager: Arc::new(RwLock::new(
776 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777 )),
778 ws_broadcast: None,
779 lifecycle_hooks: None,
780 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781 #[cfg(feature = "chaos")]
782 chaos_api_state: None,
783 server_config: None,
784 #[cfg(feature = "conformance")]
785 conformance_state: crate::handlers::conformance::ConformanceState::new(),
786 }
787 }
788
789 pub fn with_lifecycle_hooks(
791 mut self,
792 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793 ) -> Self {
794 self.lifecycle_hooks = Some(hooks);
795 self
796 }
797
798 pub fn with_ws_broadcast(
800 mut self,
801 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802 ) -> Self {
803 self.ws_broadcast = Some(ws_broadcast);
804 self
805 }
806
807 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809 self.proxy_config = Some(proxy_config);
810 self
811 }
812
813 #[cfg(feature = "smtp")]
814 pub fn with_smtp_registry(
816 mut self,
817 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818 ) -> Self {
819 self.smtp_registry = Some(smtp_registry);
820 self
821 }
822
823 #[cfg(feature = "mqtt")]
824 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826 self.mqtt_broker = Some(mqtt_broker);
827 self
828 }
829
830 #[cfg(feature = "kafka")]
831 pub fn with_kafka_broker(
833 mut self,
834 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835 ) -> Self {
836 self.kafka_broker = Some(kafka_broker);
837 self
838 }
839
840 #[cfg(feature = "chaos")]
841 pub fn with_chaos_api_state(
843 mut self,
844 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845 ) -> Self {
846 self.chaos_api_state = Some(chaos_api_state);
847 self
848 }
849
850 pub fn with_server_config(
852 mut self,
853 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854 ) -> Self {
855 self.server_config = Some(server_config);
856 self
857 }
858}
859
860async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862 let mocks = state.mocks.read().await;
863 Json(serde_json::json!({
864 "mocks": *mocks,
865 "total": mocks.len(),
866 "enabled": mocks.iter().filter(|m| m.enabled).count()
867 }))
868}
869
870async fn get_mock(
872 State(state): State<ManagementState>,
873 Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875 let mocks = state.mocks.read().await;
876 mocks
877 .iter()
878 .find(|m| m.id == id)
879 .cloned()
880 .map(Json)
881 .ok_or(StatusCode::NOT_FOUND)
882}
883
884async fn create_mock(
886 State(state): State<ManagementState>,
887 Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889 let mut mocks = state.mocks.write().await;
890
891 if mock.id.is_empty() {
893 mock.id = uuid::Uuid::new_v4().to_string();
894 }
895
896 if mocks.iter().any(|m| m.id == mock.id) {
898 return Err(StatusCode::CONFLICT);
899 }
900
901 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903 if let Some(hooks) = &state.lifecycle_hooks {
905 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906 id: mock.id.clone(),
907 name: mock.name.clone(),
908 config: serde_json::to_value(&mock).unwrap_or_default(),
909 };
910 hooks.invoke_mock_created(&event).await;
911 }
912
913 mocks.push(mock.clone());
914
915 if let Some(tx) = &state.ws_broadcast {
917 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918 }
919
920 Ok(Json(mock))
921}
922
923async fn update_mock(
925 State(state): State<ManagementState>,
926 Path(id): Path<String>,
927 Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929 let mut mocks = state.mocks.write().await;
930
931 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933 let old_mock = mocks[position].clone();
935
936 info!("Updating mock: {}", id);
937 mocks[position] = updated_mock.clone();
938
939 if let Some(hooks) = &state.lifecycle_hooks {
941 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942 id: updated_mock.id.clone(),
943 name: updated_mock.name.clone(),
944 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945 };
946 hooks.invoke_mock_updated(&event).await;
947
948 if old_mock.enabled != updated_mock.enabled {
950 let state_event = if updated_mock.enabled {
951 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952 id: updated_mock.id.clone(),
953 }
954 } else {
955 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956 id: updated_mock.id.clone(),
957 }
958 };
959 hooks.invoke_mock_state_changed(&state_event).await;
960 }
961 }
962
963 if let Some(tx) = &state.ws_broadcast {
965 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966 }
967
968 Ok(Json(updated_mock))
969}
970
971async fn delete_mock(
973 State(state): State<ManagementState>,
974 Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976 let mut mocks = state.mocks.write().await;
977
978 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980 let deleted_mock = mocks[position].clone();
982
983 info!("Deleting mock: {}", id);
984 mocks.remove(position);
985
986 if let Some(hooks) = &state.lifecycle_hooks {
988 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989 id: deleted_mock.id.clone(),
990 name: deleted_mock.name.clone(),
991 };
992 hooks.invoke_mock_deleted(&event).await;
993 }
994
995 if let Some(tx) = &state.ws_broadcast {
997 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998 }
999
1000 Ok(StatusCode::NO_CONTENT)
1001}
1002
1003#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006 pub config: serde_json::Value,
1008 #[serde(default = "default_format")]
1010 pub format: String,
1011}
1012
1013fn default_format() -> String {
1014 "json".to_string()
1015}
1016
1017async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019 use mockforge_core::config::ServerConfig;
1020
1021 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022 "yaml" | "yml" => {
1023 let yaml_str = match serde_json::to_string(&request.config) {
1024 Ok(s) => s,
1025 Err(e) => {
1026 return (
1027 StatusCode::BAD_REQUEST,
1028 Json(serde_json::json!({
1029 "valid": false,
1030 "error": format!("Failed to convert to string: {}", e),
1031 "message": "Configuration validation failed"
1032 })),
1033 )
1034 .into_response();
1035 }
1036 };
1037 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038 }
1039 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040 };
1041
1042 match config_result {
1043 Ok(_) => Json(serde_json::json!({
1044 "valid": true,
1045 "message": "Configuration is valid"
1046 }))
1047 .into_response(),
1048 Err(e) => (
1049 StatusCode::BAD_REQUEST,
1050 Json(serde_json::json!({
1051 "valid": false,
1052 "error": format!("Invalid configuration: {}", e),
1053 "message": "Configuration validation failed"
1054 })),
1055 )
1056 .into_response(),
1057 }
1058}
1059
1060#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063 pub updates: serde_json::Value,
1065}
1066
1067async fn bulk_update_config(
1075 State(state): State<ManagementState>,
1076 Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078 if !request.updates.is_object() {
1080 return (
1081 StatusCode::BAD_REQUEST,
1082 Json(serde_json::json!({
1083 "error": "Invalid request",
1084 "message": "Updates must be a JSON object"
1085 })),
1086 )
1087 .into_response();
1088 }
1089
1090 use mockforge_core::config::ServerConfig;
1092
1093 let base_config = ServerConfig::default();
1095 let base_json = match serde_json::to_value(&base_config) {
1096 Ok(v) => v,
1097 Err(e) => {
1098 return (
1099 StatusCode::INTERNAL_SERVER_ERROR,
1100 Json(serde_json::json!({
1101 "error": "Internal error",
1102 "message": format!("Failed to serialize base config: {}", e)
1103 })),
1104 )
1105 .into_response();
1106 }
1107 };
1108
1109 let mut merged = base_json.clone();
1111 if let (Some(merged_obj), Some(updates_obj)) =
1112 (merged.as_object_mut(), request.updates.as_object())
1113 {
1114 for (key, value) in updates_obj {
1115 merged_obj.insert(key.clone(), value.clone());
1116 }
1117 }
1118
1119 match serde_json::from_value::<ServerConfig>(merged) {
1121 Ok(validated_config) => {
1122 if let Some(ref config_lock) = state.server_config {
1124 let mut config = config_lock.write().await;
1125 *config = validated_config;
1126 Json(serde_json::json!({
1127 "success": true,
1128 "message": "Bulk configuration update applied successfully",
1129 "updates_received": request.updates,
1130 "validated": true,
1131 "applied": true
1132 }))
1133 .into_response()
1134 } else {
1135 Json(serde_json::json!({
1136 "success": true,
1137 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138 "updates_received": request.updates,
1139 "validated": true,
1140 "applied": false
1141 }))
1142 .into_response()
1143 }
1144 }
1145 Err(e) => (
1146 StatusCode::BAD_REQUEST,
1147 Json(serde_json::json!({
1148 "error": "Invalid configuration",
1149 "message": format!("Configuration validation failed: {}", e),
1150 "validated": false
1151 })),
1152 )
1153 .into_response(),
1154 }
1155}
1156
1157async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159 let mocks = state.mocks.read().await;
1160 let request_count = *state.request_counter.read().await;
1161
1162 Json(ServerStats {
1163 uptime_seconds: state.start_time.elapsed().as_secs(),
1164 total_requests: request_count,
1165 active_mocks: mocks.len(),
1166 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167 registered_routes: mocks.len(), })
1169}
1170
1171async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173 Json(ServerConfig {
1174 version: env!("CARGO_PKG_VERSION").to_string(),
1175 port: state.port,
1176 has_openapi_spec: state.spec.is_some(),
1177 spec_path: state.spec_path.clone(),
1178 })
1179}
1180
1181async fn get_openapi_spec(
1183 State(state): State<ManagementState>,
1184) -> Result<Json<serde_json::Value>, StatusCode> {
1185 match &state.spec {
1186 Some(spec) => match &spec.raw_document {
1187 Some(doc) => Ok(Json(doc.clone())),
1188 None => {
1189 match serde_json::to_value(&spec.spec) {
1191 Ok(val) => Ok(Json(val)),
1192 Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1193 }
1194 }
1195 },
1196 None => Err(StatusCode::NOT_FOUND),
1197 }
1198}
1199
1200async fn health_check() -> Json<serde_json::Value> {
1202 Json(serde_json::json!({
1203 "status": "healthy",
1204 "service": "mockforge-management",
1205 "timestamp": chrono::Utc::now().to_rfc3339()
1206 }))
1207}
1208
1209async fn get_capabilities() -> Json<serde_json::Value> {
1214 let mut features = vec!["core", "http", "management", "mocks", "proxy", "ai"];
1215
1216 #[cfg(feature = "smtp")]
1217 features.push("smtp");
1218 #[cfg(feature = "mqtt")]
1219 features.push("mqtt");
1220 #[cfg(feature = "kafka")]
1221 features.push("kafka");
1222 #[cfg(feature = "conformance")]
1223 features.push("conformance");
1224 #[cfg(feature = "behavioral-cloning")]
1225 features.push("behavioral-cloning");
1226
1227 features.extend_from_slice(&[
1229 "chaos",
1230 "network-profiles",
1231 "state-machines",
1232 "migration",
1233 "snapshot-diff",
1234 "mockai",
1235 ]);
1236
1237 Json(serde_json::json!({
1238 "features": features,
1239 "version": env!("CARGO_PKG_VERSION"),
1240 }))
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize)]
1245#[serde(rename_all = "lowercase")]
1246pub enum ExportFormat {
1247 Json,
1249 Yaml,
1251}
1252
1253async fn export_mocks(
1255 State(state): State<ManagementState>,
1256 Query(params): Query<std::collections::HashMap<String, String>>,
1257) -> Result<(StatusCode, String), StatusCode> {
1258 let mocks = state.mocks.read().await;
1259
1260 let format = params
1261 .get("format")
1262 .map(|f| match f.as_str() {
1263 "yaml" | "yml" => ExportFormat::Yaml,
1264 _ => ExportFormat::Json,
1265 })
1266 .unwrap_or(ExportFormat::Json);
1267
1268 match format {
1269 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1270 .map(|json| (StatusCode::OK, json))
1271 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1272 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1273 .map(|yaml| (StatusCode::OK, yaml))
1274 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1275 }
1276}
1277
1278async fn import_mocks(
1280 State(state): State<ManagementState>,
1281 Json(mocks): Json<Vec<MockConfig>>,
1282) -> impl IntoResponse {
1283 let mut current_mocks = state.mocks.write().await;
1284 current_mocks.clear();
1285 current_mocks.extend(mocks);
1286 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1287}
1288
1289#[cfg(feature = "smtp")]
1290async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1292 if let Some(ref smtp_registry) = state.smtp_registry {
1293 match smtp_registry.get_emails() {
1294 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1295 Err(e) => (
1296 StatusCode::INTERNAL_SERVER_ERROR,
1297 Json(serde_json::json!({
1298 "error": "Failed to retrieve emails",
1299 "message": e.to_string()
1300 })),
1301 ),
1302 }
1303 } else {
1304 (
1305 StatusCode::NOT_IMPLEMENTED,
1306 Json(serde_json::json!({
1307 "error": "SMTP mailbox management not available",
1308 "message": "SMTP server is not enabled or registry not available."
1309 })),
1310 )
1311 }
1312}
1313
1314#[cfg(feature = "smtp")]
1316async fn get_smtp_email(
1317 State(state): State<ManagementState>,
1318 Path(id): Path<String>,
1319) -> impl IntoResponse {
1320 if let Some(ref smtp_registry) = state.smtp_registry {
1321 match smtp_registry.get_email_by_id(&id) {
1322 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1323 Ok(None) => (
1324 StatusCode::NOT_FOUND,
1325 Json(serde_json::json!({
1326 "error": "Email not found",
1327 "id": id
1328 })),
1329 ),
1330 Err(e) => (
1331 StatusCode::INTERNAL_SERVER_ERROR,
1332 Json(serde_json::json!({
1333 "error": "Failed to retrieve email",
1334 "message": e.to_string()
1335 })),
1336 ),
1337 }
1338 } else {
1339 (
1340 StatusCode::NOT_IMPLEMENTED,
1341 Json(serde_json::json!({
1342 "error": "SMTP mailbox management not available",
1343 "message": "SMTP server is not enabled or registry not available."
1344 })),
1345 )
1346 }
1347}
1348
1349#[cfg(feature = "smtp")]
1351async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1352 if let Some(ref smtp_registry) = state.smtp_registry {
1353 match smtp_registry.clear_mailbox() {
1354 Ok(()) => (
1355 StatusCode::OK,
1356 Json(serde_json::json!({
1357 "message": "Mailbox cleared successfully"
1358 })),
1359 ),
1360 Err(e) => (
1361 StatusCode::INTERNAL_SERVER_ERROR,
1362 Json(serde_json::json!({
1363 "error": "Failed to clear mailbox",
1364 "message": e.to_string()
1365 })),
1366 ),
1367 }
1368 } else {
1369 (
1370 StatusCode::NOT_IMPLEMENTED,
1371 Json(serde_json::json!({
1372 "error": "SMTP mailbox management not available",
1373 "message": "SMTP server is not enabled or registry not available."
1374 })),
1375 )
1376 }
1377}
1378
1379#[cfg(feature = "smtp")]
1381async fn export_smtp_mailbox(
1382 Query(params): Query<std::collections::HashMap<String, String>>,
1383) -> impl IntoResponse {
1384 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1385 (
1386 StatusCode::NOT_IMPLEMENTED,
1387 Json(serde_json::json!({
1388 "error": "SMTP mailbox management not available via HTTP API",
1389 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1390 "requested_format": format
1391 })),
1392 )
1393}
1394
1395#[cfg(feature = "smtp")]
1397async fn search_smtp_emails(
1398 State(state): State<ManagementState>,
1399 Query(params): Query<std::collections::HashMap<String, String>>,
1400) -> impl IntoResponse {
1401 if let Some(ref smtp_registry) = state.smtp_registry {
1402 let filters = EmailSearchFilters {
1403 sender: params.get("sender").cloned(),
1404 recipient: params.get("recipient").cloned(),
1405 subject: params.get("subject").cloned(),
1406 body: params.get("body").cloned(),
1407 since: params
1408 .get("since")
1409 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1410 .map(|dt| dt.with_timezone(&chrono::Utc)),
1411 until: params
1412 .get("until")
1413 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1414 .map(|dt| dt.with_timezone(&chrono::Utc)),
1415 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1416 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1417 };
1418
1419 match smtp_registry.search_emails(filters) {
1420 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1421 Err(e) => (
1422 StatusCode::INTERNAL_SERVER_ERROR,
1423 Json(serde_json::json!({
1424 "error": "Failed to search emails",
1425 "message": e.to_string()
1426 })),
1427 ),
1428 }
1429 } else {
1430 (
1431 StatusCode::NOT_IMPLEMENTED,
1432 Json(serde_json::json!({
1433 "error": "SMTP mailbox management not available",
1434 "message": "SMTP server is not enabled or registry not available."
1435 })),
1436 )
1437 }
1438}
1439
1440#[cfg(feature = "mqtt")]
1442#[derive(Debug, Clone, Serialize, Deserialize)]
1443pub struct MqttBrokerStats {
1444 pub connected_clients: usize,
1446 pub active_topics: usize,
1448 pub retained_messages: usize,
1450 pub total_subscriptions: usize,
1452}
1453
1454#[cfg(feature = "mqtt")]
1456async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1457 if let Some(broker) = &state.mqtt_broker {
1458 let connected_clients = broker.get_connected_clients().await.len();
1459 let active_topics = broker.get_active_topics().await.len();
1460 let stats = broker.get_topic_stats().await;
1461
1462 let broker_stats = MqttBrokerStats {
1463 connected_clients,
1464 active_topics,
1465 retained_messages: stats.retained_messages,
1466 total_subscriptions: stats.total_subscriptions,
1467 };
1468
1469 Json(broker_stats).into_response()
1470 } else {
1471 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1472 }
1473}
1474
1475#[cfg(feature = "mqtt")]
1476async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1477 if let Some(broker) = &state.mqtt_broker {
1478 let clients = broker.get_connected_clients().await;
1479 Json(serde_json::json!({
1480 "clients": clients
1481 }))
1482 .into_response()
1483 } else {
1484 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1485 }
1486}
1487
1488#[cfg(feature = "mqtt")]
1489async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1490 if let Some(broker) = &state.mqtt_broker {
1491 let topics = broker.get_active_topics().await;
1492 Json(serde_json::json!({
1493 "topics": topics
1494 }))
1495 .into_response()
1496 } else {
1497 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1498 }
1499}
1500
1501#[cfg(feature = "mqtt")]
1502async fn disconnect_mqtt_client(
1503 State(state): State<ManagementState>,
1504 Path(client_id): Path<String>,
1505) -> impl IntoResponse {
1506 if let Some(broker) = &state.mqtt_broker {
1507 match broker.disconnect_client(&client_id).await {
1508 Ok(_) => {
1509 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1510 }
1511 Err(e) => {
1512 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1513 .into_response()
1514 }
1515 }
1516 } else {
1517 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1518 }
1519}
1520
1521#[cfg(feature = "mqtt")]
1524#[derive(Debug, Deserialize)]
1526pub struct MqttPublishRequest {
1527 pub topic: String,
1529 pub payload: String,
1531 #[serde(default = "default_qos")]
1533 pub qos: u8,
1534 #[serde(default)]
1536 pub retain: bool,
1537}
1538
1539#[cfg(feature = "mqtt")]
1540fn default_qos() -> u8 {
1541 0
1542}
1543
1544#[cfg(feature = "mqtt")]
1545async fn publish_mqtt_message_handler(
1547 State(state): State<ManagementState>,
1548 Json(request): Json<serde_json::Value>,
1549) -> impl IntoResponse {
1550 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1552 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1553 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1554 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1555
1556 if topic.is_none() || payload.is_none() {
1557 return (
1558 StatusCode::BAD_REQUEST,
1559 Json(serde_json::json!({
1560 "error": "Invalid request",
1561 "message": "Missing required fields: topic and payload"
1562 })),
1563 );
1564 }
1565
1566 let topic = topic.unwrap();
1567 let payload = payload.unwrap();
1568
1569 if let Some(broker) = &state.mqtt_broker {
1570 if qos > 2 {
1572 return (
1573 StatusCode::BAD_REQUEST,
1574 Json(serde_json::json!({
1575 "error": "Invalid QoS",
1576 "message": "QoS must be 0, 1, or 2"
1577 })),
1578 );
1579 }
1580
1581 let payload_bytes = payload.as_bytes().to_vec();
1583 let client_id = "mockforge-management-api".to_string();
1584
1585 let publish_result = broker
1586 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1587 .await
1588 .map_err(|e| format!("{}", e));
1589
1590 match publish_result {
1591 Ok(_) => {
1592 let event = MessageEvent::Mqtt(MqttMessageEvent {
1594 topic: topic.clone(),
1595 payload: payload.clone(),
1596 qos,
1597 retain,
1598 timestamp: chrono::Utc::now().to_rfc3339(),
1599 });
1600 let _ = state.message_events.send(event);
1601
1602 (
1603 StatusCode::OK,
1604 Json(serde_json::json!({
1605 "success": true,
1606 "message": format!("Message published to topic '{}'", topic),
1607 "topic": topic,
1608 "qos": qos,
1609 "retain": retain
1610 })),
1611 )
1612 }
1613 Err(error_msg) => (
1614 StatusCode::INTERNAL_SERVER_ERROR,
1615 Json(serde_json::json!({
1616 "error": "Failed to publish message",
1617 "message": error_msg
1618 })),
1619 ),
1620 }
1621 } else {
1622 (
1623 StatusCode::SERVICE_UNAVAILABLE,
1624 Json(serde_json::json!({
1625 "error": "MQTT broker not available",
1626 "message": "MQTT broker is not enabled or not available."
1627 })),
1628 )
1629 }
1630}
1631
1632#[cfg(not(feature = "mqtt"))]
1633async fn publish_mqtt_message_handler(
1635 State(_state): State<ManagementState>,
1636 Json(_request): Json<serde_json::Value>,
1637) -> impl IntoResponse {
1638 (
1639 StatusCode::SERVICE_UNAVAILABLE,
1640 Json(serde_json::json!({
1641 "error": "MQTT feature not enabled",
1642 "message": "MQTT support is not compiled into this build"
1643 })),
1644 )
1645}
1646
1647#[cfg(feature = "mqtt")]
1648#[derive(Debug, Deserialize)]
1650pub struct MqttBatchPublishRequest {
1651 pub messages: Vec<MqttPublishRequest>,
1653 #[serde(default = "default_delay")]
1655 pub delay_ms: u64,
1656}
1657
1658#[cfg(feature = "mqtt")]
1659fn default_delay() -> u64 {
1660 100
1661}
1662
1663#[cfg(feature = "mqtt")]
1664async fn publish_mqtt_batch_handler(
1666 State(state): State<ManagementState>,
1667 Json(request): Json<serde_json::Value>,
1668) -> impl IntoResponse {
1669 let messages_json = request.get("messages").and_then(|v| v.as_array());
1671 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1672
1673 if messages_json.is_none() {
1674 return (
1675 StatusCode::BAD_REQUEST,
1676 Json(serde_json::json!({
1677 "error": "Invalid request",
1678 "message": "Missing required field: messages"
1679 })),
1680 );
1681 }
1682
1683 let messages_json = messages_json.unwrap();
1684
1685 if let Some(broker) = &state.mqtt_broker {
1686 if messages_json.is_empty() {
1687 return (
1688 StatusCode::BAD_REQUEST,
1689 Json(serde_json::json!({
1690 "error": "Empty batch",
1691 "message": "At least one message is required"
1692 })),
1693 );
1694 }
1695
1696 let mut results = Vec::new();
1697 let client_id = "mockforge-management-api".to_string();
1698
1699 for (index, msg_json) in messages_json.iter().enumerate() {
1700 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1701 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1702 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1703 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1704
1705 if topic.is_none() || payload.is_none() {
1706 results.push(serde_json::json!({
1707 "index": index,
1708 "success": false,
1709 "error": "Missing required fields: topic and payload"
1710 }));
1711 continue;
1712 }
1713
1714 let topic = topic.unwrap();
1715 let payload = payload.unwrap();
1716
1717 if qos > 2 {
1719 results.push(serde_json::json!({
1720 "index": index,
1721 "success": false,
1722 "error": "Invalid QoS (must be 0, 1, or 2)"
1723 }));
1724 continue;
1725 }
1726
1727 let payload_bytes = payload.as_bytes().to_vec();
1729
1730 let publish_result = broker
1731 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1732 .await
1733 .map_err(|e| format!("{}", e));
1734
1735 match publish_result {
1736 Ok(_) => {
1737 let event = MessageEvent::Mqtt(MqttMessageEvent {
1739 topic: topic.clone(),
1740 payload: payload.clone(),
1741 qos,
1742 retain,
1743 timestamp: chrono::Utc::now().to_rfc3339(),
1744 });
1745 let _ = state.message_events.send(event);
1746
1747 results.push(serde_json::json!({
1748 "index": index,
1749 "success": true,
1750 "topic": topic,
1751 "qos": qos
1752 }));
1753 }
1754 Err(error_msg) => {
1755 results.push(serde_json::json!({
1756 "index": index,
1757 "success": false,
1758 "error": error_msg
1759 }));
1760 }
1761 }
1762
1763 if index < messages_json.len() - 1 && delay_ms > 0 {
1765 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1766 }
1767 }
1768
1769 let success_count =
1770 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1771
1772 (
1773 StatusCode::OK,
1774 Json(serde_json::json!({
1775 "success": true,
1776 "total": messages_json.len(),
1777 "succeeded": success_count,
1778 "failed": messages_json.len() - success_count,
1779 "results": results
1780 })),
1781 )
1782 } else {
1783 (
1784 StatusCode::SERVICE_UNAVAILABLE,
1785 Json(serde_json::json!({
1786 "error": "MQTT broker not available",
1787 "message": "MQTT broker is not enabled or not available."
1788 })),
1789 )
1790 }
1791}
1792
1793#[cfg(not(feature = "mqtt"))]
1794async fn publish_mqtt_batch_handler(
1796 State(_state): State<ManagementState>,
1797 Json(_request): Json<serde_json::Value>,
1798) -> impl IntoResponse {
1799 (
1800 StatusCode::SERVICE_UNAVAILABLE,
1801 Json(serde_json::json!({
1802 "error": "MQTT feature not enabled",
1803 "message": "MQTT support is not compiled into this build"
1804 })),
1805 )
1806}
1807
1808#[derive(Debug, Deserialize)]
1812struct SetMigrationModeRequest {
1813 mode: String,
1814}
1815
1816async fn get_migration_routes(
1818 State(state): State<ManagementState>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820 let proxy_config = match &state.proxy_config {
1821 Some(config) => config,
1822 None => {
1823 return Ok(Json(serde_json::json!({
1824 "error": "Migration not configured. Proxy config not available."
1825 })));
1826 }
1827 };
1828
1829 let config = proxy_config.read().await;
1830 let routes = config.get_migration_routes();
1831
1832 Ok(Json(serde_json::json!({
1833 "routes": routes
1834 })))
1835}
1836
1837async fn toggle_route_migration(
1839 State(state): State<ManagementState>,
1840 Path(pattern): Path<String>,
1841) -> Result<Json<serde_json::Value>, StatusCode> {
1842 let proxy_config = match &state.proxy_config {
1843 Some(config) => config,
1844 None => {
1845 return Ok(Json(serde_json::json!({
1846 "error": "Migration not configured. Proxy config not available."
1847 })));
1848 }
1849 };
1850
1851 let mut config = proxy_config.write().await;
1852 let new_mode = match config.toggle_route_migration(&pattern) {
1853 Some(mode) => mode,
1854 None => {
1855 return Ok(Json(serde_json::json!({
1856 "error": format!("Route pattern not found: {}", pattern)
1857 })));
1858 }
1859 };
1860
1861 Ok(Json(serde_json::json!({
1862 "pattern": pattern,
1863 "mode": format!("{:?}", new_mode).to_lowercase()
1864 })))
1865}
1866
1867async fn set_route_migration_mode(
1869 State(state): State<ManagementState>,
1870 Path(pattern): Path<String>,
1871 Json(request): Json<SetMigrationModeRequest>,
1872) -> Result<Json<serde_json::Value>, StatusCode> {
1873 let proxy_config = match &state.proxy_config {
1874 Some(config) => config,
1875 None => {
1876 return Ok(Json(serde_json::json!({
1877 "error": "Migration not configured. Proxy config not available."
1878 })));
1879 }
1880 };
1881
1882 use mockforge_core::proxy::config::MigrationMode;
1883 let mode = match request.mode.to_lowercase().as_str() {
1884 "mock" => MigrationMode::Mock,
1885 "shadow" => MigrationMode::Shadow,
1886 "real" => MigrationMode::Real,
1887 "auto" => MigrationMode::Auto,
1888 _ => {
1889 return Ok(Json(serde_json::json!({
1890 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1891 })));
1892 }
1893 };
1894
1895 let mut config = proxy_config.write().await;
1896 let updated = config.update_rule_migration_mode(&pattern, mode);
1897
1898 if !updated {
1899 return Ok(Json(serde_json::json!({
1900 "error": format!("Route pattern not found: {}", pattern)
1901 })));
1902 }
1903
1904 Ok(Json(serde_json::json!({
1905 "pattern": pattern,
1906 "mode": format!("{:?}", mode).to_lowercase()
1907 })))
1908}
1909
1910async fn toggle_group_migration(
1912 State(state): State<ManagementState>,
1913 Path(group): Path<String>,
1914) -> Result<Json<serde_json::Value>, StatusCode> {
1915 let proxy_config = match &state.proxy_config {
1916 Some(config) => config,
1917 None => {
1918 return Ok(Json(serde_json::json!({
1919 "error": "Migration not configured. Proxy config not available."
1920 })));
1921 }
1922 };
1923
1924 let mut config = proxy_config.write().await;
1925 let new_mode = config.toggle_group_migration(&group);
1926
1927 Ok(Json(serde_json::json!({
1928 "group": group,
1929 "mode": format!("{:?}", new_mode).to_lowercase()
1930 })))
1931}
1932
1933async fn set_group_migration_mode(
1935 State(state): State<ManagementState>,
1936 Path(group): Path<String>,
1937 Json(request): Json<SetMigrationModeRequest>,
1938) -> Result<Json<serde_json::Value>, StatusCode> {
1939 let proxy_config = match &state.proxy_config {
1940 Some(config) => config,
1941 None => {
1942 return Ok(Json(serde_json::json!({
1943 "error": "Migration not configured. Proxy config not available."
1944 })));
1945 }
1946 };
1947
1948 use mockforge_core::proxy::config::MigrationMode;
1949 let mode = match request.mode.to_lowercase().as_str() {
1950 "mock" => MigrationMode::Mock,
1951 "shadow" => MigrationMode::Shadow,
1952 "real" => MigrationMode::Real,
1953 "auto" => MigrationMode::Auto,
1954 _ => {
1955 return Ok(Json(serde_json::json!({
1956 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1957 })));
1958 }
1959 };
1960
1961 let mut config = proxy_config.write().await;
1962 config.update_group_migration_mode(&group, mode);
1963
1964 Ok(Json(serde_json::json!({
1965 "group": group,
1966 "mode": format!("{:?}", mode).to_lowercase()
1967 })))
1968}
1969
1970async fn get_migration_groups(
1972 State(state): State<ManagementState>,
1973) -> Result<Json<serde_json::Value>, StatusCode> {
1974 let proxy_config = match &state.proxy_config {
1975 Some(config) => config,
1976 None => {
1977 return Ok(Json(serde_json::json!({
1978 "error": "Migration not configured. Proxy config not available."
1979 })));
1980 }
1981 };
1982
1983 let config = proxy_config.read().await;
1984 let groups = config.get_migration_groups();
1985
1986 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1988 .into_iter()
1989 .map(|(name, info)| {
1990 (
1991 name,
1992 serde_json::json!({
1993 "name": info.name,
1994 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1995 "route_count": info.route_count
1996 }),
1997 )
1998 })
1999 .collect();
2000
2001 Ok(Json(serde_json::json!(groups_json)))
2002}
2003
2004async fn get_migration_status(
2006 State(state): State<ManagementState>,
2007) -> Result<Json<serde_json::Value>, StatusCode> {
2008 let proxy_config = match &state.proxy_config {
2009 Some(config) => config,
2010 None => {
2011 return Ok(Json(serde_json::json!({
2012 "error": "Migration not configured. Proxy config not available."
2013 })));
2014 }
2015 };
2016
2017 let config = proxy_config.read().await;
2018 let routes = config.get_migration_routes();
2019 let groups = config.get_migration_groups();
2020
2021 let mut mock_count = 0;
2022 let mut shadow_count = 0;
2023 let mut real_count = 0;
2024 let mut auto_count = 0;
2025
2026 for route in &routes {
2027 match route.migration_mode {
2028 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
2029 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
2030 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
2031 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
2032 }
2033 }
2034
2035 Ok(Json(serde_json::json!({
2036 "total_routes": routes.len(),
2037 "mock_routes": mock_count,
2038 "shadow_routes": shadow_count,
2039 "real_routes": real_count,
2040 "auto_routes": auto_count,
2041 "total_groups": groups.len(),
2042 "migration_enabled": config.migration_enabled
2043 })))
2044}
2045
2046#[derive(Debug, Deserialize, Serialize)]
2050pub struct ProxyRuleRequest {
2051 pub pattern: String,
2053 #[serde(rename = "type")]
2055 pub rule_type: String,
2056 #[serde(default)]
2058 pub status_codes: Vec<u16>,
2059 pub body_transforms: Vec<BodyTransformRequest>,
2061 #[serde(default = "default_true")]
2063 pub enabled: bool,
2064}
2065
2066#[derive(Debug, Deserialize, Serialize)]
2068pub struct BodyTransformRequest {
2069 pub path: String,
2071 pub replace: String,
2073 #[serde(default)]
2075 pub operation: String,
2076}
2077
2078#[derive(Debug, Serialize)]
2080pub struct ProxyRuleResponse {
2081 pub id: usize,
2083 pub pattern: String,
2085 #[serde(rename = "type")]
2087 pub rule_type: String,
2088 pub status_codes: Vec<u16>,
2090 pub body_transforms: Vec<BodyTransformRequest>,
2092 pub enabled: bool,
2094}
2095
2096async fn list_proxy_rules(
2098 State(state): State<ManagementState>,
2099) -> Result<Json<serde_json::Value>, StatusCode> {
2100 let proxy_config = match &state.proxy_config {
2101 Some(config) => config,
2102 None => {
2103 return Ok(Json(serde_json::json!({
2104 "error": "Proxy not configured. Proxy config not available."
2105 })));
2106 }
2107 };
2108
2109 let config = proxy_config.read().await;
2110
2111 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2112
2113 for (idx, rule) in config.request_replacements.iter().enumerate() {
2115 rules.push(ProxyRuleResponse {
2116 id: idx,
2117 pattern: rule.pattern.clone(),
2118 rule_type: "request".to_string(),
2119 status_codes: Vec::new(),
2120 body_transforms: rule
2121 .body_transforms
2122 .iter()
2123 .map(|t| BodyTransformRequest {
2124 path: t.path.clone(),
2125 replace: t.replace.clone(),
2126 operation: format!("{:?}", t.operation).to_lowercase(),
2127 })
2128 .collect(),
2129 enabled: rule.enabled,
2130 });
2131 }
2132
2133 let request_count = config.request_replacements.len();
2135 for (idx, rule) in config.response_replacements.iter().enumerate() {
2136 rules.push(ProxyRuleResponse {
2137 id: request_count + idx,
2138 pattern: rule.pattern.clone(),
2139 rule_type: "response".to_string(),
2140 status_codes: rule.status_codes.clone(),
2141 body_transforms: rule
2142 .body_transforms
2143 .iter()
2144 .map(|t| BodyTransformRequest {
2145 path: t.path.clone(),
2146 replace: t.replace.clone(),
2147 operation: format!("{:?}", t.operation).to_lowercase(),
2148 })
2149 .collect(),
2150 enabled: rule.enabled,
2151 });
2152 }
2153
2154 Ok(Json(serde_json::json!({
2155 "rules": rules
2156 })))
2157}
2158
2159async fn create_proxy_rule(
2161 State(state): State<ManagementState>,
2162 Json(request): Json<ProxyRuleRequest>,
2163) -> Result<Json<serde_json::Value>, StatusCode> {
2164 let proxy_config = match &state.proxy_config {
2165 Some(config) => config,
2166 None => {
2167 return Ok(Json(serde_json::json!({
2168 "error": "Proxy not configured. Proxy config not available."
2169 })));
2170 }
2171 };
2172
2173 if request.body_transforms.is_empty() {
2175 return Ok(Json(serde_json::json!({
2176 "error": "At least one body transform is required"
2177 })));
2178 }
2179
2180 let body_transforms: Vec<BodyTransform> = request
2181 .body_transforms
2182 .iter()
2183 .map(|t| {
2184 let op = match t.operation.as_str() {
2185 "replace" => TransformOperation::Replace,
2186 "add" => TransformOperation::Add,
2187 "remove" => TransformOperation::Remove,
2188 _ => TransformOperation::Replace,
2189 };
2190 BodyTransform {
2191 path: t.path.clone(),
2192 replace: t.replace.clone(),
2193 operation: op,
2194 }
2195 })
2196 .collect();
2197
2198 let new_rule = BodyTransformRule {
2199 pattern: request.pattern.clone(),
2200 status_codes: request.status_codes.clone(),
2201 body_transforms,
2202 enabled: request.enabled,
2203 };
2204
2205 let mut config = proxy_config.write().await;
2206
2207 let rule_id = if request.rule_type == "request" {
2208 config.request_replacements.push(new_rule);
2209 config.request_replacements.len() - 1
2210 } else if request.rule_type == "response" {
2211 config.response_replacements.push(new_rule);
2212 config.request_replacements.len() + config.response_replacements.len() - 1
2213 } else {
2214 return Ok(Json(serde_json::json!({
2215 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2216 })));
2217 };
2218
2219 Ok(Json(serde_json::json!({
2220 "id": rule_id,
2221 "message": "Rule created successfully"
2222 })))
2223}
2224
2225async fn get_proxy_rule(
2227 State(state): State<ManagementState>,
2228 Path(id): Path<String>,
2229) -> Result<Json<serde_json::Value>, StatusCode> {
2230 let proxy_config = match &state.proxy_config {
2231 Some(config) => config,
2232 None => {
2233 return Ok(Json(serde_json::json!({
2234 "error": "Proxy not configured. Proxy config not available."
2235 })));
2236 }
2237 };
2238
2239 let config = proxy_config.read().await;
2240 let rule_id: usize = match id.parse() {
2241 Ok(id) => id,
2242 Err(_) => {
2243 return Ok(Json(serde_json::json!({
2244 "error": format!("Invalid rule ID: {}", id)
2245 })));
2246 }
2247 };
2248
2249 let request_count = config.request_replacements.len();
2250
2251 if rule_id < request_count {
2252 let rule = &config.request_replacements[rule_id];
2254 Ok(Json(serde_json::json!({
2255 "id": rule_id,
2256 "pattern": rule.pattern,
2257 "type": "request",
2258 "status_codes": [],
2259 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2260 "path": t.path,
2261 "replace": t.replace,
2262 "operation": format!("{:?}", t.operation).to_lowercase()
2263 })).collect::<Vec<_>>(),
2264 "enabled": rule.enabled
2265 })))
2266 } else if rule_id < request_count + config.response_replacements.len() {
2267 let response_idx = rule_id - request_count;
2269 let rule = &config.response_replacements[response_idx];
2270 Ok(Json(serde_json::json!({
2271 "id": rule_id,
2272 "pattern": rule.pattern,
2273 "type": "response",
2274 "status_codes": rule.status_codes,
2275 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2276 "path": t.path,
2277 "replace": t.replace,
2278 "operation": format!("{:?}", t.operation).to_lowercase()
2279 })).collect::<Vec<_>>(),
2280 "enabled": rule.enabled
2281 })))
2282 } else {
2283 Ok(Json(serde_json::json!({
2284 "error": format!("Rule ID {} not found", rule_id)
2285 })))
2286 }
2287}
2288
2289async fn update_proxy_rule(
2291 State(state): State<ManagementState>,
2292 Path(id): Path<String>,
2293 Json(request): Json<ProxyRuleRequest>,
2294) -> Result<Json<serde_json::Value>, StatusCode> {
2295 let proxy_config = match &state.proxy_config {
2296 Some(config) => config,
2297 None => {
2298 return Ok(Json(serde_json::json!({
2299 "error": "Proxy not configured. Proxy config not available."
2300 })));
2301 }
2302 };
2303
2304 let mut config = proxy_config.write().await;
2305 let rule_id: usize = match id.parse() {
2306 Ok(id) => id,
2307 Err(_) => {
2308 return Ok(Json(serde_json::json!({
2309 "error": format!("Invalid rule ID: {}", id)
2310 })));
2311 }
2312 };
2313
2314 let body_transforms: Vec<BodyTransform> = request
2315 .body_transforms
2316 .iter()
2317 .map(|t| {
2318 let op = match t.operation.as_str() {
2319 "replace" => TransformOperation::Replace,
2320 "add" => TransformOperation::Add,
2321 "remove" => TransformOperation::Remove,
2322 _ => TransformOperation::Replace,
2323 };
2324 BodyTransform {
2325 path: t.path.clone(),
2326 replace: t.replace.clone(),
2327 operation: op,
2328 }
2329 })
2330 .collect();
2331
2332 let updated_rule = BodyTransformRule {
2333 pattern: request.pattern.clone(),
2334 status_codes: request.status_codes.clone(),
2335 body_transforms,
2336 enabled: request.enabled,
2337 };
2338
2339 let request_count = config.request_replacements.len();
2340
2341 if rule_id < request_count {
2342 config.request_replacements[rule_id] = updated_rule;
2344 } else if rule_id < request_count + config.response_replacements.len() {
2345 let response_idx = rule_id - request_count;
2347 config.response_replacements[response_idx] = updated_rule;
2348 } else {
2349 return Ok(Json(serde_json::json!({
2350 "error": format!("Rule ID {} not found", rule_id)
2351 })));
2352 }
2353
2354 Ok(Json(serde_json::json!({
2355 "id": rule_id,
2356 "message": "Rule updated successfully"
2357 })))
2358}
2359
2360async fn delete_proxy_rule(
2362 State(state): State<ManagementState>,
2363 Path(id): Path<String>,
2364) -> Result<Json<serde_json::Value>, StatusCode> {
2365 let proxy_config = match &state.proxy_config {
2366 Some(config) => config,
2367 None => {
2368 return Ok(Json(serde_json::json!({
2369 "error": "Proxy not configured. Proxy config not available."
2370 })));
2371 }
2372 };
2373
2374 let mut config = proxy_config.write().await;
2375 let rule_id: usize = match id.parse() {
2376 Ok(id) => id,
2377 Err(_) => {
2378 return Ok(Json(serde_json::json!({
2379 "error": format!("Invalid rule ID: {}", id)
2380 })));
2381 }
2382 };
2383
2384 let request_count = config.request_replacements.len();
2385
2386 if rule_id < request_count {
2387 config.request_replacements.remove(rule_id);
2389 } else if rule_id < request_count + config.response_replacements.len() {
2390 let response_idx = rule_id - request_count;
2392 config.response_replacements.remove(response_idx);
2393 } else {
2394 return Ok(Json(serde_json::json!({
2395 "error": format!("Rule ID {} not found", rule_id)
2396 })));
2397 }
2398
2399 Ok(Json(serde_json::json!({
2400 "id": rule_id,
2401 "message": "Rule deleted successfully"
2402 })))
2403}
2404
2405async fn get_proxy_inspect(
2407 State(state): State<ManagementState>,
2408 Query(params): Query<std::collections::HashMap<String, String>>,
2409) -> Result<Json<serde_json::Value>, StatusCode> {
2410 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2411 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2412
2413 let proxy_config = match &state.proxy_config {
2414 Some(config) => config.read().await,
2415 None => {
2416 return Ok(Json(serde_json::json!({
2417 "error": "Proxy not configured. Proxy config not available."
2418 })));
2419 }
2420 };
2421
2422 let mut rules = Vec::new();
2423 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2424 rules.push(serde_json::json!({
2425 "id": idx,
2426 "kind": "request",
2427 "pattern": rule.pattern,
2428 "enabled": rule.enabled,
2429 "status_codes": rule.status_codes,
2430 "transform_count": rule.body_transforms.len(),
2431 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2432 "path": t.path,
2433 "operation": t.operation,
2434 "replace": t.replace
2435 })).collect::<Vec<_>>()
2436 }));
2437 }
2438 let request_rule_count = rules.len();
2439 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2440 rules.push(serde_json::json!({
2441 "id": request_rule_count + idx,
2442 "kind": "response",
2443 "pattern": rule.pattern,
2444 "enabled": rule.enabled,
2445 "status_codes": rule.status_codes,
2446 "transform_count": rule.body_transforms.len(),
2447 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2448 "path": t.path,
2449 "operation": t.operation,
2450 "replace": t.replace
2451 })).collect::<Vec<_>>()
2452 }));
2453 }
2454
2455 let total = rules.len();
2456 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2457
2458 Ok(Json(serde_json::json!({
2459 "enabled": proxy_config.enabled,
2460 "target_url": proxy_config.target_url,
2461 "prefix": proxy_config.prefix,
2462 "timeout_seconds": proxy_config.timeout_seconds,
2463 "follow_redirects": proxy_config.follow_redirects,
2464 "passthrough_by_default": proxy_config.passthrough_by_default,
2465 "rules": paged_rules,
2466 "request_rule_count": request_rule_count,
2467 "response_rule_count": total.saturating_sub(request_rule_count),
2468 "limit": limit,
2469 "offset": offset,
2470 "total": total
2471 })))
2472}
2473
2474pub fn management_router(state: ManagementState) -> Router {
2476 let router = Router::new()
2477 .route("/capabilities", get(get_capabilities))
2478 .route("/health", get(health_check))
2479 .route("/stats", get(get_stats))
2480 .route("/config", get(get_config))
2481 .route("/config/validate", post(validate_config))
2482 .route("/config/bulk", post(bulk_update_config))
2483 .route("/mocks", get(list_mocks))
2484 .route("/mocks", post(create_mock))
2485 .route("/mocks/{id}", get(get_mock))
2486 .route("/mocks/{id}", put(update_mock))
2487 .route("/mocks/{id}", delete(delete_mock))
2488 .route("/export", get(export_mocks))
2489 .route("/import", post(import_mocks))
2490 .route("/spec", get(get_openapi_spec));
2491
2492 #[cfg(feature = "smtp")]
2493 let router = router
2494 .route("/smtp/mailbox", get(list_smtp_emails))
2495 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2496 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2497 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2498 .route("/smtp/mailbox/search", get(search_smtp_emails));
2499
2500 #[cfg(not(feature = "smtp"))]
2501 let router = router;
2502
2503 #[cfg(feature = "mqtt")]
2505 let router = router
2506 .route("/mqtt/stats", get(get_mqtt_stats))
2507 .route("/mqtt/clients", get(get_mqtt_clients))
2508 .route("/mqtt/topics", get(get_mqtt_topics))
2509 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2510 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2511 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2512 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2513
2514 #[cfg(not(feature = "mqtt"))]
2515 let router = router
2516 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2517 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2518
2519 #[cfg(feature = "kafka")]
2520 let router = router
2521 .route("/kafka/stats", get(get_kafka_stats))
2522 .route("/kafka/topics", get(get_kafka_topics))
2523 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2524 .route("/kafka/groups", get(get_kafka_groups))
2525 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2526 .route("/kafka/produce", post(produce_kafka_message))
2527 .route("/kafka/produce/batch", post(produce_kafka_batch))
2528 .route("/kafka/messages/stream", get(kafka_messages_stream));
2529
2530 #[cfg(not(feature = "kafka"))]
2531 let router = router;
2532
2533 let router = router
2535 .route("/migration/routes", get(get_migration_routes))
2536 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2537 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2538 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2539 .route("/migration/groups/{group}", put(set_group_migration_mode))
2540 .route("/migration/groups", get(get_migration_groups))
2541 .route("/migration/status", get(get_migration_status));
2542
2543 let router = router
2545 .route("/proxy/rules", get(list_proxy_rules))
2546 .route("/proxy/rules", post(create_proxy_rule))
2547 .route("/proxy/rules/{id}", get(get_proxy_rule))
2548 .route("/proxy/rules/{id}", put(update_proxy_rule))
2549 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2550 .route("/proxy/inspect", get(get_proxy_inspect));
2551
2552 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2554
2555 let router = router.nest(
2557 "/snapshot-diff",
2558 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2559 );
2560
2561 #[cfg(feature = "behavioral-cloning")]
2562 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2563
2564 let router = router
2565 .route("/mockai/learn", post(learn_from_examples))
2566 .route("/mockai/rules/explanations", get(list_rule_explanations))
2567 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2568 .route("/chaos/config", get(get_chaos_config))
2569 .route("/chaos/config", post(update_chaos_config))
2570 .route("/network/profiles", get(list_network_profiles))
2571 .route("/network/profile/apply", post(apply_network_profile));
2572
2573 let router =
2575 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2576
2577 #[cfg(feature = "conformance")]
2579 let router = router.nest_service(
2580 "/conformance",
2581 crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2582 );
2583 #[cfg(not(feature = "conformance"))]
2584 let router = router;
2585
2586 router.with_state(state)
2587}
2588
2589#[cfg(feature = "kafka")]
2590#[derive(Debug, Clone, Serialize, Deserialize)]
2592pub struct KafkaBrokerStats {
2593 pub topics: usize,
2595 pub partitions: usize,
2597 pub consumer_groups: usize,
2599 pub messages_produced: u64,
2601 pub messages_consumed: u64,
2603}
2604
2605#[cfg(feature = "kafka")]
2606#[allow(missing_docs)]
2607#[derive(Debug, Clone, Serialize, Deserialize)]
2608pub struct KafkaTopicInfo {
2609 pub name: String,
2610 pub partitions: usize,
2611 pub replication_factor: i32,
2612}
2613
2614#[cfg(feature = "kafka")]
2615#[allow(missing_docs)]
2616#[derive(Debug, Clone, Serialize, Deserialize)]
2617pub struct KafkaConsumerGroupInfo {
2618 pub group_id: String,
2619 pub members: usize,
2620 pub state: String,
2621}
2622
2623#[cfg(feature = "kafka")]
2624async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2626 if let Some(broker) = &state.kafka_broker {
2627 let topics = broker.topics.read().await;
2628 let consumer_groups = broker.consumer_groups.read().await;
2629
2630 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2631
2632 let metrics_snapshot = broker.metrics().snapshot();
2634
2635 let stats = KafkaBrokerStats {
2636 topics: topics.len(),
2637 partitions: total_partitions,
2638 consumer_groups: consumer_groups.groups().len(),
2639 messages_produced: metrics_snapshot.messages_produced_total,
2640 messages_consumed: metrics_snapshot.messages_consumed_total,
2641 };
2642
2643 Json(stats).into_response()
2644 } else {
2645 (
2646 StatusCode::SERVICE_UNAVAILABLE,
2647 Json(serde_json::json!({
2648 "error": "Kafka broker not available",
2649 "message": "Kafka broker is not enabled or not available."
2650 })),
2651 )
2652 .into_response()
2653 }
2654}
2655
2656#[cfg(feature = "kafka")]
2657async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2659 if let Some(broker) = &state.kafka_broker {
2660 let topics = broker.topics.read().await;
2661 let topic_list: Vec<KafkaTopicInfo> = topics
2662 .iter()
2663 .map(|(name, topic)| KafkaTopicInfo {
2664 name: name.clone(),
2665 partitions: topic.partitions.len(),
2666 replication_factor: topic.config.replication_factor as i32,
2667 })
2668 .collect();
2669
2670 Json(serde_json::json!({
2671 "topics": topic_list
2672 }))
2673 .into_response()
2674 } else {
2675 (
2676 StatusCode::SERVICE_UNAVAILABLE,
2677 Json(serde_json::json!({
2678 "error": "Kafka broker not available",
2679 "message": "Kafka broker is not enabled or not available."
2680 })),
2681 )
2682 .into_response()
2683 }
2684}
2685
2686#[cfg(feature = "kafka")]
2687async fn get_kafka_topic(
2689 State(state): State<ManagementState>,
2690 Path(topic_name): Path<String>,
2691) -> impl IntoResponse {
2692 if let Some(broker) = &state.kafka_broker {
2693 let topics = broker.topics.read().await;
2694 if let Some(topic) = topics.get(&topic_name) {
2695 Json(serde_json::json!({
2696 "name": topic_name,
2697 "partitions": topic.partitions.len(),
2698 "replication_factor": topic.config.replication_factor,
2699 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2700 "id": idx as i32,
2701 "leader": 0,
2702 "replicas": vec![0],
2703 "message_count": partition.messages.len()
2704 })).collect::<Vec<_>>()
2705 })).into_response()
2706 } else {
2707 (
2708 StatusCode::NOT_FOUND,
2709 Json(serde_json::json!({
2710 "error": "Topic not found",
2711 "topic": topic_name
2712 })),
2713 )
2714 .into_response()
2715 }
2716 } else {
2717 (
2718 StatusCode::SERVICE_UNAVAILABLE,
2719 Json(serde_json::json!({
2720 "error": "Kafka broker not available",
2721 "message": "Kafka broker is not enabled or not available."
2722 })),
2723 )
2724 .into_response()
2725 }
2726}
2727
2728#[cfg(feature = "kafka")]
2729async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2731 if let Some(broker) = &state.kafka_broker {
2732 let consumer_groups = broker.consumer_groups.read().await;
2733 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2734 .groups()
2735 .iter()
2736 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2737 group_id: group_id.clone(),
2738 members: group.members.len(),
2739 state: "Stable".to_string(), })
2741 .collect();
2742
2743 Json(serde_json::json!({
2744 "groups": groups
2745 }))
2746 .into_response()
2747 } else {
2748 (
2749 StatusCode::SERVICE_UNAVAILABLE,
2750 Json(serde_json::json!({
2751 "error": "Kafka broker not available",
2752 "message": "Kafka broker is not enabled or not available."
2753 })),
2754 )
2755 .into_response()
2756 }
2757}
2758
2759#[cfg(feature = "kafka")]
2760async fn get_kafka_group(
2762 State(state): State<ManagementState>,
2763 Path(group_id): Path<String>,
2764) -> impl IntoResponse {
2765 if let Some(broker) = &state.kafka_broker {
2766 let consumer_groups = broker.consumer_groups.read().await;
2767 if let Some(group) = consumer_groups.groups().get(&group_id) {
2768 Json(serde_json::json!({
2769 "group_id": group_id,
2770 "members": group.members.len(),
2771 "state": "Stable",
2772 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2773 "member_id": member_id,
2774 "client_id": member.client_id,
2775 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2776 "topic": a.topic,
2777 "partitions": a.partitions
2778 })).collect::<Vec<_>>()
2779 })).collect::<Vec<_>>(),
2780 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2781 "topic": topic,
2782 "partition": partition,
2783 "offset": offset
2784 })).collect::<Vec<_>>()
2785 })).into_response()
2786 } else {
2787 (
2788 StatusCode::NOT_FOUND,
2789 Json(serde_json::json!({
2790 "error": "Consumer group not found",
2791 "group_id": group_id
2792 })),
2793 )
2794 .into_response()
2795 }
2796 } else {
2797 (
2798 StatusCode::SERVICE_UNAVAILABLE,
2799 Json(serde_json::json!({
2800 "error": "Kafka broker not available",
2801 "message": "Kafka broker is not enabled or not available."
2802 })),
2803 )
2804 .into_response()
2805 }
2806}
2807
2808#[cfg(feature = "kafka")]
2811#[derive(Debug, Deserialize)]
2813pub struct KafkaProduceRequest {
2814 pub topic: String,
2816 #[serde(default)]
2818 pub key: Option<String>,
2819 pub value: String,
2821 #[serde(default)]
2823 pub partition: Option<i32>,
2824 #[serde(default)]
2826 pub headers: Option<std::collections::HashMap<String, String>>,
2827}
2828
2829#[cfg(feature = "kafka")]
2830async fn produce_kafka_message(
2832 State(state): State<ManagementState>,
2833 Json(request): Json<KafkaProduceRequest>,
2834) -> impl IntoResponse {
2835 if let Some(broker) = &state.kafka_broker {
2836 let mut topics = broker.topics.write().await;
2837
2838 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2840 mockforge_kafka::topics::Topic::new(
2841 request.topic.clone(),
2842 mockforge_kafka::topics::TopicConfig::default(),
2843 )
2844 });
2845
2846 let partition_id = if let Some(partition) = request.partition {
2848 partition
2849 } else {
2850 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2851 };
2852
2853 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2855 return (
2856 StatusCode::BAD_REQUEST,
2857 Json(serde_json::json!({
2858 "error": "Invalid partition",
2859 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2860 })),
2861 )
2862 .into_response();
2863 }
2864
2865 let key_clone = request.key.clone();
2867 let headers_clone = request.headers.clone();
2868 let message = mockforge_kafka::partitions::KafkaMessage {
2869 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2871 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2872 value: request.value.as_bytes().to_vec(),
2873 headers: headers_clone
2874 .clone()
2875 .unwrap_or_default()
2876 .into_iter()
2877 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2878 .collect(),
2879 };
2880
2881 match topic_entry.produce(partition_id, message).await {
2883 Ok(offset) => {
2884 if let Some(broker) = &state.kafka_broker {
2886 broker.metrics().record_messages_produced(1);
2887 }
2888
2889 #[cfg(feature = "kafka")]
2891 {
2892 let event = MessageEvent::Kafka(KafkaMessageEvent {
2893 topic: request.topic.clone(),
2894 key: key_clone,
2895 value: request.value.clone(),
2896 partition: partition_id,
2897 offset,
2898 headers: headers_clone,
2899 timestamp: chrono::Utc::now().to_rfc3339(),
2900 });
2901 let _ = state.message_events.send(event);
2902 }
2903
2904 Json(serde_json::json!({
2905 "success": true,
2906 "message": format!("Message produced to topic '{}'", request.topic),
2907 "topic": request.topic,
2908 "partition": partition_id,
2909 "offset": offset
2910 }))
2911 .into_response()
2912 }
2913 Err(e) => (
2914 StatusCode::INTERNAL_SERVER_ERROR,
2915 Json(serde_json::json!({
2916 "error": "Failed to produce message",
2917 "message": e.to_string()
2918 })),
2919 )
2920 .into_response(),
2921 }
2922 } else {
2923 (
2924 StatusCode::SERVICE_UNAVAILABLE,
2925 Json(serde_json::json!({
2926 "error": "Kafka broker not available",
2927 "message": "Kafka broker is not enabled or not available."
2928 })),
2929 )
2930 .into_response()
2931 }
2932}
2933
2934#[cfg(feature = "kafka")]
2935#[derive(Debug, Deserialize)]
2937pub struct KafkaBatchProduceRequest {
2938 pub messages: Vec<KafkaProduceRequest>,
2940 #[serde(default = "default_delay")]
2942 pub delay_ms: u64,
2943}
2944
2945#[cfg(feature = "kafka")]
2946async fn produce_kafka_batch(
2948 State(state): State<ManagementState>,
2949 Json(request): Json<KafkaBatchProduceRequest>,
2950) -> impl IntoResponse {
2951 if let Some(broker) = &state.kafka_broker {
2952 if request.messages.is_empty() {
2953 return (
2954 StatusCode::BAD_REQUEST,
2955 Json(serde_json::json!({
2956 "error": "Empty batch",
2957 "message": "At least one message is required"
2958 })),
2959 )
2960 .into_response();
2961 }
2962
2963 let mut results = Vec::new();
2964
2965 for (index, msg_request) in request.messages.iter().enumerate() {
2966 let mut topics = broker.topics.write().await;
2967
2968 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2970 mockforge_kafka::topics::Topic::new(
2971 msg_request.topic.clone(),
2972 mockforge_kafka::topics::TopicConfig::default(),
2973 )
2974 });
2975
2976 let partition_id = if let Some(partition) = msg_request.partition {
2978 partition
2979 } else {
2980 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2981 };
2982
2983 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2985 results.push(serde_json::json!({
2986 "index": index,
2987 "success": false,
2988 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2989 }));
2990 continue;
2991 }
2992
2993 let message = mockforge_kafka::partitions::KafkaMessage {
2995 offset: 0,
2996 timestamp: chrono::Utc::now().timestamp_millis(),
2997 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2998 value: msg_request.value.as_bytes().to_vec(),
2999 headers: msg_request
3000 .headers
3001 .clone()
3002 .unwrap_or_default()
3003 .into_iter()
3004 .map(|(k, v)| (k, v.as_bytes().to_vec()))
3005 .collect(),
3006 };
3007
3008 match topic_entry.produce(partition_id, message).await {
3010 Ok(offset) => {
3011 if let Some(broker) = &state.kafka_broker {
3013 broker.metrics().record_messages_produced(1);
3014 }
3015
3016 let event = MessageEvent::Kafka(KafkaMessageEvent {
3018 topic: msg_request.topic.clone(),
3019 key: msg_request.key.clone(),
3020 value: msg_request.value.clone(),
3021 partition: partition_id,
3022 offset,
3023 headers: msg_request.headers.clone(),
3024 timestamp: chrono::Utc::now().to_rfc3339(),
3025 });
3026 let _ = state.message_events.send(event);
3027
3028 results.push(serde_json::json!({
3029 "index": index,
3030 "success": true,
3031 "topic": msg_request.topic,
3032 "partition": partition_id,
3033 "offset": offset
3034 }));
3035 }
3036 Err(e) => {
3037 results.push(serde_json::json!({
3038 "index": index,
3039 "success": false,
3040 "error": e.to_string()
3041 }));
3042 }
3043 }
3044
3045 if index < request.messages.len() - 1 && request.delay_ms > 0 {
3047 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3048 }
3049 }
3050
3051 let success_count =
3052 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3053
3054 Json(serde_json::json!({
3055 "success": true,
3056 "total": request.messages.len(),
3057 "succeeded": success_count,
3058 "failed": request.messages.len() - success_count,
3059 "results": results
3060 }))
3061 .into_response()
3062 } else {
3063 (
3064 StatusCode::SERVICE_UNAVAILABLE,
3065 Json(serde_json::json!({
3066 "error": "Kafka broker not available",
3067 "message": "Kafka broker is not enabled or not available."
3068 })),
3069 )
3070 .into_response()
3071 }
3072}
3073
3074#[cfg(feature = "mqtt")]
3077async fn mqtt_messages_stream(
3079 State(state): State<ManagementState>,
3080 Query(params): Query<std::collections::HashMap<String, String>>,
3081) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3082 let rx = state.message_events.subscribe();
3083 let topic_filter = params.get("topic").cloned();
3084
3085 let stream = stream::unfold(rx, move |mut rx| {
3086 let topic_filter = topic_filter.clone();
3087
3088 async move {
3089 loop {
3090 match rx.recv().await {
3091 Ok(MessageEvent::Mqtt(event)) => {
3092 if let Some(filter) = &topic_filter {
3094 if !event.topic.contains(filter) {
3095 continue;
3096 }
3097 }
3098
3099 let event_json = serde_json::json!({
3100 "protocol": "mqtt",
3101 "topic": event.topic,
3102 "payload": event.payload,
3103 "qos": event.qos,
3104 "retain": event.retain,
3105 "timestamp": event.timestamp,
3106 });
3107
3108 if let Ok(event_data) = serde_json::to_string(&event_json) {
3109 let sse_event = Event::default().event("mqtt_message").data(event_data);
3110 return Some((Ok(sse_event), rx));
3111 }
3112 }
3113 #[cfg(feature = "kafka")]
3114 Ok(MessageEvent::Kafka(_)) => {
3115 continue;
3117 }
3118 Err(broadcast::error::RecvError::Closed) => {
3119 return None;
3120 }
3121 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3122 warn!("MQTT message stream lagged, skipped {} messages", skipped);
3123 continue;
3124 }
3125 }
3126 }
3127 }
3128 });
3129
3130 Sse::new(stream).keep_alive(
3131 axum::response::sse::KeepAlive::new()
3132 .interval(std::time::Duration::from_secs(15))
3133 .text("keep-alive-text"),
3134 )
3135}
3136
3137#[cfg(feature = "kafka")]
3138async fn kafka_messages_stream(
3140 State(state): State<ManagementState>,
3141 Query(params): Query<std::collections::HashMap<String, String>>,
3142) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3143 let rx = state.message_events.subscribe();
3144 let topic_filter = params.get("topic").cloned();
3145
3146 let stream = stream::unfold(rx, move |mut rx| {
3147 let topic_filter = topic_filter.clone();
3148
3149 async move {
3150 loop {
3151 match rx.recv().await {
3152 #[cfg(feature = "mqtt")]
3153 Ok(MessageEvent::Mqtt(_)) => {
3154 continue;
3156 }
3157 Ok(MessageEvent::Kafka(event)) => {
3158 if let Some(filter) = &topic_filter {
3160 if !event.topic.contains(filter) {
3161 continue;
3162 }
3163 }
3164
3165 let event_json = serde_json::json!({
3166 "protocol": "kafka",
3167 "topic": event.topic,
3168 "key": event.key,
3169 "value": event.value,
3170 "partition": event.partition,
3171 "offset": event.offset,
3172 "headers": event.headers,
3173 "timestamp": event.timestamp,
3174 });
3175
3176 if let Ok(event_data) = serde_json::to_string(&event_json) {
3177 let sse_event =
3178 Event::default().event("kafka_message").data(event_data);
3179 return Some((Ok(sse_event), rx));
3180 }
3181 }
3182 Err(broadcast::error::RecvError::Closed) => {
3183 return None;
3184 }
3185 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3186 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3187 continue;
3188 }
3189 }
3190 }
3191 }
3192 });
3193
3194 Sse::new(stream).keep_alive(
3195 axum::response::sse::KeepAlive::new()
3196 .interval(std::time::Duration::from_secs(15))
3197 .text("keep-alive-text"),
3198 )
3199}
3200
3201#[derive(Debug, Deserialize)]
3205pub struct GenerateSpecRequest {
3206 pub query: String,
3208 pub spec_type: String,
3210 pub api_version: Option<String>,
3212}
3213
3214#[derive(Debug, Deserialize)]
3216pub struct GenerateOpenApiFromTrafficRequest {
3217 #[serde(default)]
3219 pub database_path: Option<String>,
3220 #[serde(default)]
3222 pub since: Option<String>,
3223 #[serde(default)]
3225 pub until: Option<String>,
3226 #[serde(default)]
3228 pub path_pattern: Option<String>,
3229 #[serde(default = "default_min_confidence")]
3231 pub min_confidence: f64,
3232}
3233
3234fn default_min_confidence() -> f64 {
3235 0.7
3236}
3237
3238#[cfg(feature = "data-faker")]
3240async fn generate_ai_spec(
3241 State(_state): State<ManagementState>,
3242 Json(request): Json<GenerateSpecRequest>,
3243) -> impl IntoResponse {
3244 use mockforge_data::rag::{
3245 config::{LlmProvider, RagConfig},
3246 engine::RagEngine,
3247 storage::DocumentStorage,
3248 };
3249 use std::sync::Arc;
3250
3251 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3253 .ok()
3254 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3255
3256 if api_key.is_none() {
3258 return (
3259 StatusCode::SERVICE_UNAVAILABLE,
3260 Json(serde_json::json!({
3261 "error": "AI service not configured",
3262 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3263 })),
3264 )
3265 .into_response();
3266 }
3267
3268 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3270 .unwrap_or_else(|_| "openai".to_string())
3271 .to_lowercase();
3272
3273 let provider = match provider_str.as_str() {
3274 "openai" => LlmProvider::OpenAI,
3275 "anthropic" => LlmProvider::Anthropic,
3276 "ollama" => LlmProvider::Ollama,
3277 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3278 _ => LlmProvider::OpenAI,
3279 };
3280
3281 let api_endpoint =
3282 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3283 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3284 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3285 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3286 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3287 });
3288
3289 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3290 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3291 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3292 LlmProvider::Ollama => "llama2".to_string(),
3293 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3294 });
3295
3296 let rag_config = RagConfig {
3298 provider,
3299 api_endpoint,
3300 api_key,
3301 model,
3302 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3303 .unwrap_or_else(|_| "4096".to_string())
3304 .parse()
3305 .unwrap_or(4096),
3306 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3307 .unwrap_or_else(|_| "0.3".to_string())
3308 .parse()
3309 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3311 .unwrap_or_else(|_| "60".to_string())
3312 .parse()
3313 .unwrap_or(60),
3314 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3315 .unwrap_or_else(|_| "4000".to_string())
3316 .parse()
3317 .unwrap_or(4000),
3318 ..Default::default()
3319 };
3320
3321 let spec_type_label = match request.spec_type.as_str() {
3323 "openapi" => "OpenAPI 3.0",
3324 "graphql" => "GraphQL",
3325 "asyncapi" => "AsyncAPI",
3326 _ => "OpenAPI 3.0",
3327 };
3328
3329 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3330
3331 let prompt = format!(
3332 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3333
3334User Requirements:
3335{}
3336
3337Instructions:
33381. Generate a complete, valid {} specification
33392. Include all paths, operations, request/response schemas, and components
33403. Use realistic field names and data types
33414. Include proper descriptions and examples
33425. Follow {} best practices
33436. Return ONLY the specification, no additional explanation
33447. For OpenAPI, use version {}
3345
3346Return the specification in {} format."#,
3347 spec_type_label,
3348 request.query,
3349 spec_type_label,
3350 spec_type_label,
3351 api_version,
3352 if request.spec_type == "graphql" {
3353 "GraphQL SDL"
3354 } else {
3355 "YAML"
3356 }
3357 );
3358
3359 use mockforge_data::rag::storage::InMemoryStorage;
3364 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3365
3366 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3368 Ok(engine) => engine,
3369 Err(e) => {
3370 return (
3371 StatusCode::INTERNAL_SERVER_ERROR,
3372 Json(serde_json::json!({
3373 "error": "Failed to initialize RAG engine",
3374 "message": e.to_string()
3375 })),
3376 )
3377 .into_response();
3378 }
3379 };
3380
3381 match rag_engine.generate(&prompt, None).await {
3383 Ok(generated_text) => {
3384 let spec = if request.spec_type == "graphql" {
3386 extract_graphql_schema(&generated_text)
3388 } else {
3389 extract_yaml_spec(&generated_text)
3391 };
3392
3393 Json(serde_json::json!({
3394 "success": true,
3395 "spec": spec,
3396 "spec_type": request.spec_type,
3397 }))
3398 .into_response()
3399 }
3400 Err(e) => (
3401 StatusCode::INTERNAL_SERVER_ERROR,
3402 Json(serde_json::json!({
3403 "error": "AI generation failed",
3404 "message": e.to_string()
3405 })),
3406 )
3407 .into_response(),
3408 }
3409}
3410
3411#[cfg(not(feature = "data-faker"))]
3412async fn generate_ai_spec(
3413 State(_state): State<ManagementState>,
3414 Json(_request): Json<GenerateSpecRequest>,
3415) -> impl IntoResponse {
3416 (
3417 StatusCode::NOT_IMPLEMENTED,
3418 Json(serde_json::json!({
3419 "error": "AI features not enabled",
3420 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3421 })),
3422 )
3423 .into_response()
3424}
3425
3426#[cfg(feature = "behavioral-cloning")]
3428async fn generate_openapi_from_traffic(
3429 State(_state): State<ManagementState>,
3430 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3431) -> impl IntoResponse {
3432 use chrono::{DateTime, Utc};
3433 use mockforge_core::intelligent_behavior::{
3434 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3435 IntelligentBehaviorConfig,
3436 };
3437 use mockforge_recorder::{
3438 database::RecorderDatabase,
3439 openapi_export::{QueryFilters, RecordingsToOpenApi},
3440 };
3441 use std::path::PathBuf;
3442
3443 let db_path = if let Some(ref path) = request.database_path {
3445 PathBuf::from(path)
3446 } else {
3447 std::env::current_dir()
3448 .unwrap_or_else(|_| PathBuf::from("."))
3449 .join("recordings.db")
3450 };
3451
3452 let db = match RecorderDatabase::new(&db_path).await {
3454 Ok(db) => db,
3455 Err(e) => {
3456 return (
3457 StatusCode::BAD_REQUEST,
3458 Json(serde_json::json!({
3459 "error": "Database error",
3460 "message": format!("Failed to open recorder database: {}", e)
3461 })),
3462 )
3463 .into_response();
3464 }
3465 };
3466
3467 let since_dt = if let Some(ref since_str) = request.since {
3469 match DateTime::parse_from_rfc3339(since_str) {
3470 Ok(dt) => Some(dt.with_timezone(&Utc)),
3471 Err(e) => {
3472 return (
3473 StatusCode::BAD_REQUEST,
3474 Json(serde_json::json!({
3475 "error": "Invalid date format",
3476 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3477 })),
3478 )
3479 .into_response();
3480 }
3481 }
3482 } else {
3483 None
3484 };
3485
3486 let until_dt = if let Some(ref until_str) = request.until {
3487 match DateTime::parse_from_rfc3339(until_str) {
3488 Ok(dt) => Some(dt.with_timezone(&Utc)),
3489 Err(e) => {
3490 return (
3491 StatusCode::BAD_REQUEST,
3492 Json(serde_json::json!({
3493 "error": "Invalid date format",
3494 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3495 })),
3496 )
3497 .into_response();
3498 }
3499 }
3500 } else {
3501 None
3502 };
3503
3504 let query_filters = QueryFilters {
3506 since: since_dt,
3507 until: until_dt,
3508 path_pattern: request.path_pattern.clone(),
3509 min_status_code: None,
3510 max_requests: Some(1000),
3511 };
3512
3513 let exchanges_from_recorder =
3518 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3519 Ok(exchanges) => exchanges,
3520 Err(e) => {
3521 return (
3522 StatusCode::INTERNAL_SERVER_ERROR,
3523 Json(serde_json::json!({
3524 "error": "Query error",
3525 "message": format!("Failed to query HTTP exchanges: {}", e)
3526 })),
3527 )
3528 .into_response();
3529 }
3530 };
3531
3532 if exchanges_from_recorder.is_empty() {
3533 return (
3534 StatusCode::NOT_FOUND,
3535 Json(serde_json::json!({
3536 "error": "No exchanges found",
3537 "message": "No HTTP exchanges found matching the specified filters"
3538 })),
3539 )
3540 .into_response();
3541 }
3542
3543 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3545 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3546 .into_iter()
3547 .map(|e| LocalHttpExchange {
3548 method: e.method,
3549 path: e.path,
3550 query_params: e.query_params,
3551 headers: e.headers,
3552 body: e.body,
3553 body_encoding: e.body_encoding,
3554 status_code: e.status_code,
3555 response_headers: e.response_headers,
3556 response_body: e.response_body,
3557 response_body_encoding: e.response_body_encoding,
3558 timestamp: e.timestamp,
3559 })
3560 .collect();
3561
3562 let behavior_config = IntelligentBehaviorConfig::default();
3564 let gen_config = OpenApiGenerationConfig {
3565 min_confidence: request.min_confidence,
3566 behavior_model: Some(behavior_config.behavior_model),
3567 };
3568
3569 let generator = OpenApiSpecGenerator::new(gen_config);
3571 let result = match generator.generate_from_exchanges(exchanges).await {
3572 Ok(result) => result,
3573 Err(e) => {
3574 return (
3575 StatusCode::INTERNAL_SERVER_ERROR,
3576 Json(serde_json::json!({
3577 "error": "Generation error",
3578 "message": format!("Failed to generate OpenAPI spec: {}", e)
3579 })),
3580 )
3581 .into_response();
3582 }
3583 };
3584
3585 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3587 raw.clone()
3588 } else {
3589 match serde_json::to_value(&result.spec.spec) {
3590 Ok(json) => json,
3591 Err(e) => {
3592 return (
3593 StatusCode::INTERNAL_SERVER_ERROR,
3594 Json(serde_json::json!({
3595 "error": "Serialization error",
3596 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3597 })),
3598 )
3599 .into_response();
3600 }
3601 }
3602 };
3603
3604 let response = serde_json::json!({
3606 "spec": spec_json,
3607 "metadata": {
3608 "requests_analyzed": result.metadata.requests_analyzed,
3609 "paths_inferred": result.metadata.paths_inferred,
3610 "path_confidence": result.metadata.path_confidence,
3611 "generated_at": result.metadata.generated_at.to_rfc3339(),
3612 "duration_ms": result.metadata.duration_ms,
3613 }
3614 });
3615
3616 Json(response).into_response()
3617}
3618
3619async fn list_rule_explanations(
3621 State(state): State<ManagementState>,
3622 Query(params): Query<std::collections::HashMap<String, String>>,
3623) -> impl IntoResponse {
3624 use mockforge_core::intelligent_behavior::RuleType;
3625
3626 let explanations = state.rule_explanations.read().await;
3627 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3628
3629 if let Some(rule_type_str) = params.get("rule_type") {
3631 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3632 explanations_vec.retain(|e| e.rule_type == rule_type);
3633 }
3634 }
3635
3636 if let Some(min_confidence_str) = params.get("min_confidence") {
3638 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3639 explanations_vec.retain(|e| e.confidence >= min_confidence);
3640 }
3641 }
3642
3643 explanations_vec.sort_by(|a, b| {
3645 b.confidence
3646 .partial_cmp(&a.confidence)
3647 .unwrap_or(std::cmp::Ordering::Equal)
3648 .then_with(|| b.generated_at.cmp(&a.generated_at))
3649 });
3650
3651 Json(serde_json::json!({
3652 "explanations": explanations_vec,
3653 "total": explanations_vec.len(),
3654 }))
3655 .into_response()
3656}
3657
3658async fn get_rule_explanation(
3660 State(state): State<ManagementState>,
3661 Path(rule_id): Path<String>,
3662) -> impl IntoResponse {
3663 let explanations = state.rule_explanations.read().await;
3664
3665 match explanations.get(&rule_id) {
3666 Some(explanation) => Json(serde_json::json!({
3667 "explanation": explanation,
3668 }))
3669 .into_response(),
3670 None => (
3671 StatusCode::NOT_FOUND,
3672 Json(serde_json::json!({
3673 "error": "Rule explanation not found",
3674 "message": format!("No explanation found for rule ID: {}", rule_id)
3675 })),
3676 )
3677 .into_response(),
3678 }
3679}
3680
3681#[derive(Debug, Deserialize)]
3683pub struct LearnFromExamplesRequest {
3684 pub examples: Vec<ExamplePairRequest>,
3686 #[serde(default)]
3688 pub config: Option<serde_json::Value>,
3689}
3690
3691#[derive(Debug, Deserialize)]
3693pub struct ExamplePairRequest {
3694 pub request: serde_json::Value,
3696 pub response: serde_json::Value,
3698}
3699
3700async fn learn_from_examples(
3705 State(state): State<ManagementState>,
3706 Json(request): Json<LearnFromExamplesRequest>,
3707) -> impl IntoResponse {
3708 use mockforge_core::intelligent_behavior::{
3709 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3710 rule_generator::{ExamplePair, RuleGenerator},
3711 };
3712
3713 if request.examples.is_empty() {
3714 return (
3715 StatusCode::BAD_REQUEST,
3716 Json(serde_json::json!({
3717 "error": "No examples provided",
3718 "message": "At least one example pair is required"
3719 })),
3720 )
3721 .into_response();
3722 }
3723
3724 let example_pairs: Result<Vec<ExamplePair>, String> = request
3726 .examples
3727 .into_iter()
3728 .enumerate()
3729 .map(|(idx, ex)| {
3730 let method = ex
3732 .request
3733 .get("method")
3734 .and_then(|v| v.as_str())
3735 .map(|s| s.to_string())
3736 .unwrap_or_else(|| "GET".to_string());
3737 let path = ex
3738 .request
3739 .get("path")
3740 .and_then(|v| v.as_str())
3741 .map(|s| s.to_string())
3742 .unwrap_or_else(|| "/".to_string());
3743 let request_body = ex.request.get("body").cloned();
3744 let query_params = ex
3745 .request
3746 .get("query_params")
3747 .and_then(|v| v.as_object())
3748 .map(|obj| {
3749 obj.iter()
3750 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3751 .collect()
3752 })
3753 .unwrap_or_default();
3754 let headers = ex
3755 .request
3756 .get("headers")
3757 .and_then(|v| v.as_object())
3758 .map(|obj| {
3759 obj.iter()
3760 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3761 .collect()
3762 })
3763 .unwrap_or_default();
3764
3765 let status = ex
3767 .response
3768 .get("status_code")
3769 .or_else(|| ex.response.get("status"))
3770 .and_then(|v| v.as_u64())
3771 .map(|n| n as u16)
3772 .unwrap_or(200);
3773 let response_body = ex.response.get("body").cloned();
3774
3775 Ok(ExamplePair {
3776 method,
3777 path,
3778 request: request_body,
3779 status,
3780 response: response_body,
3781 query_params,
3782 headers,
3783 metadata: {
3784 let mut meta = std::collections::HashMap::new();
3785 meta.insert("source".to_string(), "api".to_string());
3786 meta.insert("example_index".to_string(), idx.to_string());
3787 meta
3788 },
3789 })
3790 })
3791 .collect();
3792
3793 let example_pairs = match example_pairs {
3794 Ok(pairs) => pairs,
3795 Err(e) => {
3796 return (
3797 StatusCode::BAD_REQUEST,
3798 Json(serde_json::json!({
3799 "error": "Invalid examples",
3800 "message": e
3801 })),
3802 )
3803 .into_response();
3804 }
3805 };
3806
3807 let behavior_config = if let Some(config_json) = request.config {
3809 serde_json::from_value(config_json)
3811 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3812 .behavior_model
3813 } else {
3814 BehaviorModelConfig::default()
3815 };
3816
3817 let generator = RuleGenerator::new(behavior_config);
3819
3820 let (rules, explanations) =
3822 match generator.generate_rules_with_explanations(example_pairs).await {
3823 Ok(result) => result,
3824 Err(e) => {
3825 return (
3826 StatusCode::INTERNAL_SERVER_ERROR,
3827 Json(serde_json::json!({
3828 "error": "Rule generation failed",
3829 "message": format!("Failed to generate rules: {}", e)
3830 })),
3831 )
3832 .into_response();
3833 }
3834 };
3835
3836 {
3838 let mut stored_explanations = state.rule_explanations.write().await;
3839 for explanation in &explanations {
3840 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3841 }
3842 }
3843
3844 let response = serde_json::json!({
3846 "success": true,
3847 "rules_generated": {
3848 "consistency_rules": rules.consistency_rules.len(),
3849 "schemas": rules.schemas.len(),
3850 "state_machines": rules.state_transitions.len(),
3851 "system_prompt": !rules.system_prompt.is_empty(),
3852 },
3853 "explanations": explanations.iter().map(|e| serde_json::json!({
3854 "rule_id": e.rule_id,
3855 "rule_type": e.rule_type,
3856 "confidence": e.confidence,
3857 "reasoning": e.reasoning,
3858 })).collect::<Vec<_>>(),
3859 "total_explanations": explanations.len(),
3860 });
3861
3862 Json(response).into_response()
3863}
3864
3865#[cfg(feature = "data-faker")]
3866fn extract_yaml_spec(text: &str) -> String {
3867 if let Some(start) = text.find("```yaml") {
3869 let yaml_start = text[start + 7..].trim_start();
3870 if let Some(end) = yaml_start.find("```") {
3871 return yaml_start[..end].trim().to_string();
3872 }
3873 }
3874 if let Some(start) = text.find("```") {
3875 let content_start = text[start + 3..].trim_start();
3876 if let Some(end) = content_start.find("```") {
3877 return content_start[..end].trim().to_string();
3878 }
3879 }
3880
3881 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3883 return text.trim().to_string();
3884 }
3885
3886 text.trim().to_string()
3888}
3889
3890#[cfg(feature = "data-faker")]
3892fn extract_graphql_schema(text: &str) -> String {
3893 if let Some(start) = text.find("```graphql") {
3895 let schema_start = text[start + 10..].trim_start();
3896 if let Some(end) = schema_start.find("```") {
3897 return schema_start[..end].trim().to_string();
3898 }
3899 }
3900 if let Some(start) = text.find("```") {
3901 let content_start = text[start + 3..].trim_start();
3902 if let Some(end) = content_start.find("```") {
3903 return content_start[..end].trim().to_string();
3904 }
3905 }
3906
3907 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3909 return text.trim().to_string();
3910 }
3911
3912 text.trim().to_string()
3913}
3914
3915async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3919 #[cfg(feature = "chaos")]
3920 {
3921 if let Some(chaos_state) = &_state.chaos_api_state {
3922 let config = chaos_state.config.read().await;
3923 Json(serde_json::json!({
3925 "enabled": config.enabled,
3926 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3927 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3928 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3929 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3930 }))
3931 .into_response()
3932 } else {
3933 Json(serde_json::json!({
3935 "enabled": false,
3936 "latency": null,
3937 "fault_injection": null,
3938 "rate_limit": null,
3939 "traffic_shaping": null,
3940 }))
3941 .into_response()
3942 }
3943 }
3944 #[cfg(not(feature = "chaos"))]
3945 {
3946 Json(serde_json::json!({
3948 "enabled": false,
3949 "latency": null,
3950 "fault_injection": null,
3951 "rate_limit": null,
3952 "traffic_shaping": null,
3953 }))
3954 .into_response()
3955 }
3956}
3957
3958#[derive(Debug, Deserialize)]
3960pub struct ChaosConfigUpdate {
3961 pub enabled: Option<bool>,
3963 pub latency: Option<serde_json::Value>,
3965 pub fault_injection: Option<serde_json::Value>,
3967 pub rate_limit: Option<serde_json::Value>,
3969 pub traffic_shaping: Option<serde_json::Value>,
3971}
3972
3973async fn update_chaos_config(
3975 State(_state): State<ManagementState>,
3976 Json(_config_update): Json<ChaosConfigUpdate>,
3977) -> impl IntoResponse {
3978 #[cfg(feature = "chaos")]
3979 {
3980 if let Some(chaos_state) = &_state.chaos_api_state {
3981 use mockforge_chaos::config::{
3982 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3983 };
3984
3985 let mut config = chaos_state.config.write().await;
3986
3987 if let Some(enabled) = _config_update.enabled {
3989 config.enabled = enabled;
3990 }
3991
3992 if let Some(latency_json) = _config_update.latency {
3994 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3995 config.latency = Some(latency);
3996 }
3997 }
3998
3999 if let Some(fault_json) = _config_update.fault_injection {
4001 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
4002 config.fault_injection = Some(fault);
4003 }
4004 }
4005
4006 if let Some(rate_json) = _config_update.rate_limit {
4008 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
4009 config.rate_limit = Some(rate);
4010 }
4011 }
4012
4013 if let Some(traffic_json) = _config_update.traffic_shaping {
4015 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
4016 config.traffic_shaping = Some(traffic);
4017 }
4018 }
4019
4020 drop(config);
4023
4024 info!("Chaos configuration updated successfully");
4025 Json(serde_json::json!({
4026 "success": true,
4027 "message": "Chaos configuration updated and applied"
4028 }))
4029 .into_response()
4030 } else {
4031 (
4032 StatusCode::SERVICE_UNAVAILABLE,
4033 Json(serde_json::json!({
4034 "success": false,
4035 "error": "Chaos API not available",
4036 "message": "Chaos engineering is not enabled or configured"
4037 })),
4038 )
4039 .into_response()
4040 }
4041 }
4042 #[cfg(not(feature = "chaos"))]
4043 {
4044 (
4045 StatusCode::NOT_IMPLEMENTED,
4046 Json(serde_json::json!({
4047 "success": false,
4048 "error": "Chaos feature not enabled",
4049 "message": "Chaos engineering feature is not compiled into this build"
4050 })),
4051 )
4052 .into_response()
4053 }
4054}
4055
4056async fn list_network_profiles() -> impl IntoResponse {
4060 use mockforge_core::network_profiles::NetworkProfileCatalog;
4061
4062 let catalog = NetworkProfileCatalog::default();
4063 let profiles: Vec<serde_json::Value> = catalog
4064 .list_profiles_with_description()
4065 .iter()
4066 .map(|(name, description)| {
4067 serde_json::json!({
4068 "name": name,
4069 "description": description,
4070 })
4071 })
4072 .collect();
4073
4074 Json(serde_json::json!({
4075 "profiles": profiles
4076 }))
4077 .into_response()
4078}
4079
4080#[derive(Debug, Deserialize)]
4081pub struct ApplyNetworkProfileRequest {
4083 pub profile_name: String,
4085}
4086
4087async fn apply_network_profile(
4089 State(state): State<ManagementState>,
4090 Json(request): Json<ApplyNetworkProfileRequest>,
4091) -> impl IntoResponse {
4092 use mockforge_core::network_profiles::NetworkProfileCatalog;
4093
4094 let catalog = NetworkProfileCatalog::default();
4095 if let Some(profile) = catalog.get(&request.profile_name) {
4096 if let Some(server_config) = &state.server_config {
4099 let mut config = server_config.write().await;
4100
4101 use mockforge_core::config::NetworkShapingConfig;
4103
4104 let network_shaping = NetworkShapingConfig {
4108 enabled: profile.traffic_shaping.bandwidth.enabled
4109 || profile.traffic_shaping.burst_loss.enabled,
4110 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4112 max_connections: 1000, };
4114
4115 if let Some(ref mut chaos) = config.observability.chaos {
4118 chaos.traffic_shaping = Some(network_shaping);
4119 } else {
4120 use mockforge_core::config::ChaosEngConfig;
4122 config.observability.chaos = Some(ChaosEngConfig {
4123 enabled: true,
4124 latency: None,
4125 fault_injection: None,
4126 rate_limit: None,
4127 traffic_shaping: Some(network_shaping),
4128 scenario: None,
4129 });
4130 }
4131
4132 info!("Network profile '{}' applied to server configuration", request.profile_name);
4133 } else {
4134 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4135 }
4136
4137 #[cfg(feature = "chaos")]
4139 {
4140 if let Some(chaos_state) = &state.chaos_api_state {
4141 use mockforge_chaos::config::TrafficShapingConfig;
4142
4143 let mut chaos_config = chaos_state.config.write().await;
4144 let chaos_traffic_shaping = TrafficShapingConfig {
4146 enabled: profile.traffic_shaping.bandwidth.enabled
4147 || profile.traffic_shaping.burst_loss.enabled,
4148 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4150 max_connections: 0,
4151 connection_timeout_ms: 30000,
4152 };
4153 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4154 chaos_config.enabled = true; drop(chaos_config);
4156 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4157 }
4158 }
4159
4160 Json(serde_json::json!({
4161 "success": true,
4162 "message": format!("Network profile '{}' applied", request.profile_name),
4163 "profile": {
4164 "name": profile.name,
4165 "description": profile.description,
4166 }
4167 }))
4168 .into_response()
4169 } else {
4170 (
4171 StatusCode::NOT_FOUND,
4172 Json(serde_json::json!({
4173 "error": "Profile not found",
4174 "message": format!("Network profile '{}' not found", request.profile_name)
4175 })),
4176 )
4177 .into_response()
4178 }
4179}
4180
4181pub fn management_router_with_ui_builder(
4183 state: ManagementState,
4184 server_config: mockforge_core::config::ServerConfig,
4185) -> Router {
4186 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4187
4188 let management = management_router(state);
4190
4191 let ui_builder_state = UIBuilderState::new(server_config);
4193 let ui_builder = create_ui_builder_router(ui_builder_state);
4194
4195 management.nest("/ui-builder", ui_builder)
4197}
4198
4199pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4201 use crate::spec_import::{spec_import_router, SpecImportState};
4202
4203 let management = management_router(state);
4205
4206 Router::new()
4208 .merge(management)
4209 .merge(spec_import_router(SpecImportState::new()))
4210}
4211
4212#[cfg(test)]
4213mod tests {
4214 use super::*;
4215
4216 #[tokio::test]
4217 async fn test_create_and_get_mock() {
4218 let state = ManagementState::new(None, None, 3000);
4219
4220 let mock = MockConfig {
4221 id: "test-1".to_string(),
4222 name: "Test Mock".to_string(),
4223 method: "GET".to_string(),
4224 path: "/test".to_string(),
4225 response: MockResponse {
4226 body: serde_json::json!({"message": "test"}),
4227 headers: None,
4228 },
4229 enabled: true,
4230 latency_ms: None,
4231 status_code: Some(200),
4232 request_match: None,
4233 priority: None,
4234 scenario: None,
4235 required_scenario_state: None,
4236 new_scenario_state: None,
4237 };
4238
4239 {
4241 let mut mocks = state.mocks.write().await;
4242 mocks.push(mock.clone());
4243 }
4244
4245 let mocks = state.mocks.read().await;
4247 let found = mocks.iter().find(|m| m.id == "test-1");
4248 assert!(found.is_some());
4249 assert_eq!(found.unwrap().name, "Test Mock");
4250 }
4251
4252 #[tokio::test]
4253 async fn test_server_stats() {
4254 let state = ManagementState::new(None, None, 3000);
4255
4256 {
4258 let mut mocks = state.mocks.write().await;
4259 mocks.push(MockConfig {
4260 id: "1".to_string(),
4261 name: "Mock 1".to_string(),
4262 method: "GET".to_string(),
4263 path: "/test1".to_string(),
4264 response: MockResponse {
4265 body: serde_json::json!({}),
4266 headers: None,
4267 },
4268 enabled: true,
4269 latency_ms: None,
4270 status_code: Some(200),
4271 request_match: None,
4272 priority: None,
4273 scenario: None,
4274 required_scenario_state: None,
4275 new_scenario_state: None,
4276 });
4277 mocks.push(MockConfig {
4278 id: "2".to_string(),
4279 name: "Mock 2".to_string(),
4280 method: "POST".to_string(),
4281 path: "/test2".to_string(),
4282 response: MockResponse {
4283 body: serde_json::json!({}),
4284 headers: None,
4285 },
4286 enabled: false,
4287 latency_ms: None,
4288 status_code: Some(201),
4289 request_match: None,
4290 priority: None,
4291 scenario: None,
4292 required_scenario_state: None,
4293 new_scenario_state: None,
4294 });
4295 }
4296
4297 let mocks = state.mocks.read().await;
4298 assert_eq!(mocks.len(), 2);
4299 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4300 }
4301
4302 #[test]
4303 fn test_mock_matches_request_with_xpath_absolute_path() {
4304 let mock = MockConfig {
4305 id: "xpath-1".to_string(),
4306 name: "XPath Match".to_string(),
4307 method: "POST".to_string(),
4308 path: "/xml".to_string(),
4309 response: MockResponse {
4310 body: serde_json::json!({"ok": true}),
4311 headers: None,
4312 },
4313 enabled: true,
4314 latency_ms: None,
4315 status_code: Some(200),
4316 request_match: Some(RequestMatchCriteria {
4317 xpath: Some("/root/order/id".to_string()),
4318 ..Default::default()
4319 }),
4320 priority: None,
4321 scenario: None,
4322 required_scenario_state: None,
4323 new_scenario_state: None,
4324 };
4325
4326 let body = br#"<root><order><id>123</id></order></root>"#;
4327 let headers = std::collections::HashMap::new();
4328 let query = std::collections::HashMap::new();
4329
4330 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4331 }
4332
4333 #[test]
4334 fn test_mock_matches_request_with_xpath_text_predicate() {
4335 let mock = MockConfig {
4336 id: "xpath-2".to_string(),
4337 name: "XPath Predicate Match".to_string(),
4338 method: "POST".to_string(),
4339 path: "/xml".to_string(),
4340 response: MockResponse {
4341 body: serde_json::json!({"ok": true}),
4342 headers: None,
4343 },
4344 enabled: true,
4345 latency_ms: None,
4346 status_code: Some(200),
4347 request_match: Some(RequestMatchCriteria {
4348 xpath: Some("//order/id[text()='123']".to_string()),
4349 ..Default::default()
4350 }),
4351 priority: None,
4352 scenario: None,
4353 required_scenario_state: None,
4354 new_scenario_state: None,
4355 };
4356
4357 let body = br#"<root><order><id>123</id></order></root>"#;
4358 let headers = std::collections::HashMap::new();
4359 let query = std::collections::HashMap::new();
4360
4361 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4362 }
4363
4364 #[test]
4365 fn test_mock_matches_request_with_xpath_no_match() {
4366 let mock = MockConfig {
4367 id: "xpath-3".to_string(),
4368 name: "XPath No Match".to_string(),
4369 method: "POST".to_string(),
4370 path: "/xml".to_string(),
4371 response: MockResponse {
4372 body: serde_json::json!({"ok": true}),
4373 headers: None,
4374 },
4375 enabled: true,
4376 latency_ms: None,
4377 status_code: Some(200),
4378 request_match: Some(RequestMatchCriteria {
4379 xpath: Some("//order/id[text()='456']".to_string()),
4380 ..Default::default()
4381 }),
4382 priority: None,
4383 scenario: None,
4384 required_scenario_state: None,
4385 new_scenario_state: None,
4386 };
4387
4388 let body = br#"<root><order><id>123</id></order></root>"#;
4389 let headers = std::collections::HashMap::new();
4390 let query = std::collections::HashMap::new();
4391
4392 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4393 }
4394}