1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131 pub body: serde_json::Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub headers: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146 pub query_params: std::collections::HashMap<String, String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub body_pattern: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub json_path: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub xpath: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub custom_matcher: Option<String>,
159}
160
161pub fn mock_matches_request(
170 mock: &MockConfig,
171 method: &str,
172 path: &str,
173 headers: &std::collections::HashMap<String, String>,
174 query_params: &std::collections::HashMap<String, String>,
175 body: Option<&[u8]>,
176) -> bool {
177 use regex::Regex;
178
179 if !mock.enabled {
181 return false;
182 }
183
184 if mock.method.to_uppercase() != method.to_uppercase() {
186 return false;
187 }
188
189 if !path_matches_pattern(&mock.path, path) {
191 return false;
192 }
193
194 if let Some(criteria) = &mock.request_match {
196 for (key, expected_value) in &criteria.headers {
198 let header_key_lower = key.to_lowercase();
199 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201 if let Some((_, actual_value)) = found {
202 if let Ok(re) = Regex::new(expected_value) {
204 if !re.is_match(actual_value) {
205 return false;
206 }
207 } else if actual_value != expected_value {
208 return false;
209 }
210 } else {
211 return false; }
213 }
214
215 for (key, expected_value) in &criteria.query_params {
217 if let Some(actual_value) = query_params.get(key) {
218 if actual_value != expected_value {
219 return false;
220 }
221 } else {
222 return false; }
224 }
225
226 if let Some(pattern) = &criteria.body_pattern {
228 if let Some(body_bytes) = body {
229 let body_str = String::from_utf8_lossy(body_bytes);
230 if let Ok(re) = Regex::new(pattern) {
232 if !re.is_match(&body_str) {
233 return false;
234 }
235 } else if body_str.as_ref() != pattern {
236 return false;
237 }
238 } else {
239 return false; }
241 }
242
243 if let Some(json_path) = &criteria.json_path {
245 if let Some(body_bytes) = body {
246 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248 if !json_path_exists(&json_value, json_path) {
250 return false;
251 }
252 }
253 }
254 }
255 }
256
257 if let Some(xpath) = &criteria.xpath {
259 if let Some(body_bytes) = body {
260 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261 if !xml_xpath_exists(body_str, xpath) {
262 return false;
263 }
264 } else {
265 return false;
266 }
267 } else {
268 return false; }
270 }
271
272 if let Some(custom) = &criteria.custom_matcher {
274 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275 return false;
276 }
277 }
278 }
279
280 true
281}
282
283fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285 if pattern == path {
287 return true;
288 }
289
290 if pattern == "*" {
292 return true;
293 }
294
295 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299 if pattern_parts.len() != path_parts.len() {
300 if pattern.contains('*') {
302 return matches_wildcard_pattern(pattern, path);
303 }
304 return false;
305 }
306
307 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310 continue; }
312
313 if pattern_part != path_part {
314 return false;
315 }
316 }
317
318 true
319}
320
321fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323 use regex::Regex;
324
325 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329 return re.is_match(path);
330 }
331
332 false
333}
334
335fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343 let path = if json_path == "$" {
344 return true;
345 } else if let Some(p) = json_path.strip_prefix("$.") {
346 p
347 } else if let Some(p) = json_path.strip_prefix('$') {
348 p.strip_prefix('.').unwrap_or(p)
349 } else {
350 json_path
351 };
352
353 let mut current = json;
354 for segment in split_json_path_segments(path) {
355 match segment {
356 JsonPathSegment::Field(name) => {
357 if let Some(obj) = current.as_object() {
358 if let Some(value) = obj.get(name) {
359 current = value;
360 } else {
361 return false;
362 }
363 } else {
364 return false;
365 }
366 }
367 JsonPathSegment::Index(idx) => {
368 if let Some(arr) = current.as_array() {
369 if let Some(value) = arr.get(idx) {
370 current = value;
371 } else {
372 return false;
373 }
374 } else {
375 return false;
376 }
377 }
378 JsonPathSegment::Wildcard => {
379 if let Some(arr) = current.as_array() {
380 return !arr.is_empty();
381 }
382 return false;
383 }
384 }
385 }
386 true
387}
388
389enum JsonPathSegment<'a> {
390 Field(&'a str),
391 Index(usize),
392 Wildcard,
393}
394
395fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397 let mut segments = Vec::new();
398 for part in path.split('.') {
399 if part.is_empty() {
400 continue;
401 }
402 if let Some(bracket_start) = part.find('[') {
403 let field_name = &part[..bracket_start];
404 if !field_name.is_empty() {
405 segments.push(JsonPathSegment::Field(field_name));
406 }
407 let bracket_content = &part[bracket_start + 1..part.len() - 1];
408 if bracket_content == "*" {
409 segments.push(JsonPathSegment::Wildcard);
410 } else if let Ok(idx) = bracket_content.parse::<usize>() {
411 segments.push(JsonPathSegment::Index(idx));
412 }
413 } else {
414 segments.push(JsonPathSegment::Field(part));
415 }
416 }
417 segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422 name: String,
423 text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427 if segment.is_empty() {
428 return None;
429 }
430
431 let trimmed = segment.trim();
432 if let Some(bracket_start) = trimmed.find('[') {
433 if !trimmed.ends_with(']') {
434 return None;
435 }
436
437 let name = trimmed[..bracket_start].trim();
438 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439 let predicate = predicate.trim();
440
441 if let Some(raw) = predicate.strip_prefix("text()=") {
443 let raw = raw.trim();
444 if raw.len() >= 2
445 && ((raw.starts_with('"') && raw.ends_with('"'))
446 || (raw.starts_with('\'') && raw.ends_with('\'')))
447 {
448 let text = raw[1..raw.len() - 1].to_string();
449 if !name.is_empty() {
450 return Some(XPathSegment {
451 name: name.to_string(),
452 text_equals: Some(text),
453 });
454 }
455 }
456 }
457
458 None
459 } else {
460 Some(XPathSegment {
461 name: trimmed.to_string(),
462 text_equals: None,
463 })
464 }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468 if !node.is_element() {
469 return false;
470 }
471 if node.tag_name().name() != segment.name {
472 return false;
473 }
474 match &segment.text_equals {
475 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476 None => true,
477 }
478}
479
480fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487 let doc = match roxmltree::Document::parse(xml_body) {
488 Ok(doc) => doc,
489 Err(err) => {
490 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491 return false;
492 }
493 };
494
495 let expr = xpath.trim();
496 if expr.is_empty() {
497 return false;
498 }
499
500 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501 (true, rest)
502 } else if let Some(rest) = expr.strip_prefix('/') {
503 (false, rest)
504 } else {
505 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506 return false;
507 };
508
509 let segments: Vec<XPathSegment> = path_str
510 .split('/')
511 .filter(|s| !s.trim().is_empty())
512 .filter_map(parse_xpath_segment)
513 .collect();
514
515 if segments.is_empty() {
516 return false;
517 }
518
519 if is_descendant {
520 let first = &segments[0];
521 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522 let mut frontier = vec![node];
523 for segment in &segments[1..] {
524 let mut next_frontier = Vec::new();
525 for parent in &frontier {
526 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527 next_frontier.push(child);
528 }
529 }
530 if next_frontier.is_empty() {
531 frontier.clear();
532 break;
533 }
534 frontier = next_frontier;
535 }
536 if !frontier.is_empty() {
537 return true;
538 }
539 }
540 false
541 } else {
542 let mut frontier = vec![doc.root_element()];
543 for (index, segment) in segments.iter().enumerate() {
544 let mut next_frontier = Vec::new();
545 for parent in &frontier {
546 if index == 0 {
547 if segment_matches(*parent, segment) {
548 next_frontier.push(*parent);
549 }
550 continue;
551 }
552 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553 next_frontier.push(child);
554 }
555 }
556 if next_frontier.is_empty() {
557 return false;
558 }
559 frontier = next_frontier;
560 }
561 !frontier.is_empty()
562 }
563}
564
565fn evaluate_custom_matcher(
567 expression: &str,
568 method: &str,
569 path: &str,
570 headers: &std::collections::HashMap<String, String>,
571 query_params: &std::collections::HashMap<String, String>,
572 body: Option<&[u8]>,
573) -> bool {
574 use regex::Regex;
575
576 let expr = expression.trim();
577
578 if expr.contains("==") {
580 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581 if parts.len() != 2 {
582 return false;
583 }
584
585 let field = parts[0];
586 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588 match field {
589 "method" => method == expected_value,
590 "path" => path == expected_value,
591 _ if field.starts_with("headers.") => {
592 let header_name = &field[8..];
593 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594 }
595 _ if field.starts_with("query.") => {
596 let param_name = &field[6..];
597 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598 }
599 _ => false,
600 }
601 }
602 else if expr.contains("=~") {
604 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605 if parts.len() != 2 {
606 return false;
607 }
608
609 let field = parts[0];
610 let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612 if let Ok(re) = Regex::new(pattern) {
613 match field {
614 "method" => re.is_match(method),
615 "path" => re.is_match(path),
616 _ if field.starts_with("headers.") => {
617 let header_name = &field[8..];
618 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619 }
620 _ if field.starts_with("query.") => {
621 let param_name = &field[6..];
622 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623 }
624 _ => false,
625 }
626 } else {
627 false
628 }
629 }
630 else if expr.contains("contains") {
632 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633 if parts.len() != 2 {
634 return false;
635 }
636
637 let field = parts[0];
638 let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640 match field {
641 "path" => path.contains(search_value),
642 _ if field.starts_with("headers.") => {
643 let header_name = &field[8..];
644 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645 }
646 _ if field.starts_with("body") => {
647 if let Some(body_bytes) = body {
648 let body_str = String::from_utf8_lossy(body_bytes);
649 body_str.contains(search_value)
650 } else {
651 false
652 }
653 }
654 _ => false,
655 }
656 } else {
657 tracing::warn!("Unknown custom matcher expression format: {}", expr);
659 false
660 }
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666 pub uptime_seconds: u64,
668 pub total_requests: u64,
670 pub active_mocks: usize,
672 pub enabled_mocks: usize,
674 pub registered_routes: usize,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681 pub version: String,
683 pub port: u16,
685 pub has_openapi_spec: bool,
687 #[serde(skip_serializing_if = "Option::is_none")]
689 pub spec_path: Option<String>,
690}
691
692#[derive(Clone)]
694pub struct ManagementState {
695 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697 pub spec: Option<Arc<OpenApiSpec>>,
699 pub spec_path: Option<String>,
701 pub port: u16,
703 pub start_time: std::time::Instant,
705 pub request_counter: Arc<RwLock<u64>>,
707 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709 #[cfg(feature = "smtp")]
711 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712 #[cfg(feature = "mqtt")]
714 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715 #[cfg(feature = "kafka")]
717 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718 #[cfg(any(feature = "mqtt", feature = "kafka"))]
720 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721 pub state_machine_manager:
723 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728 pub rule_explanations: Arc<
730 RwLock<
731 std::collections::HashMap<
732 String,
733 mockforge_core::intelligent_behavior::RuleExplanation,
734 >,
735 >,
736 >,
737 #[cfg(feature = "chaos")]
739 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742}
743
744impl ManagementState {
745 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
752 Self {
753 mocks: Arc::new(RwLock::new(Vec::new())),
754 spec,
755 spec_path,
756 port,
757 start_time: std::time::Instant::now(),
758 request_counter: Arc::new(RwLock::new(0)),
759 proxy_config: None,
760 #[cfg(feature = "smtp")]
761 smtp_registry: None,
762 #[cfg(feature = "mqtt")]
763 mqtt_broker: None,
764 #[cfg(feature = "kafka")]
765 kafka_broker: None,
766 #[cfg(any(feature = "mqtt", feature = "kafka"))]
767 message_events: {
768 let capacity = get_message_broadcast_capacity();
769 let (tx, _) = broadcast::channel(capacity);
770 Arc::new(tx)
771 },
772 state_machine_manager: Arc::new(RwLock::new(
773 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
774 )),
775 ws_broadcast: None,
776 lifecycle_hooks: None,
777 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
778 #[cfg(feature = "chaos")]
779 chaos_api_state: None,
780 server_config: None,
781 }
782 }
783
784 pub fn with_lifecycle_hooks(
786 mut self,
787 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
788 ) -> Self {
789 self.lifecycle_hooks = Some(hooks);
790 self
791 }
792
793 pub fn with_ws_broadcast(
795 mut self,
796 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
797 ) -> Self {
798 self.ws_broadcast = Some(ws_broadcast);
799 self
800 }
801
802 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
804 self.proxy_config = Some(proxy_config);
805 self
806 }
807
808 #[cfg(feature = "smtp")]
809 pub fn with_smtp_registry(
811 mut self,
812 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
813 ) -> Self {
814 self.smtp_registry = Some(smtp_registry);
815 self
816 }
817
818 #[cfg(feature = "mqtt")]
819 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
821 self.mqtt_broker = Some(mqtt_broker);
822 self
823 }
824
825 #[cfg(feature = "kafka")]
826 pub fn with_kafka_broker(
828 mut self,
829 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
830 ) -> Self {
831 self.kafka_broker = Some(kafka_broker);
832 self
833 }
834
835 #[cfg(feature = "chaos")]
836 pub fn with_chaos_api_state(
838 mut self,
839 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
840 ) -> Self {
841 self.chaos_api_state = Some(chaos_api_state);
842 self
843 }
844
845 pub fn with_server_config(
847 mut self,
848 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
849 ) -> Self {
850 self.server_config = Some(server_config);
851 self
852 }
853}
854
855async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
857 let mocks = state.mocks.read().await;
858 Json(serde_json::json!({
859 "mocks": *mocks,
860 "total": mocks.len(),
861 "enabled": mocks.iter().filter(|m| m.enabled).count()
862 }))
863}
864
865async fn get_mock(
867 State(state): State<ManagementState>,
868 Path(id): Path<String>,
869) -> Result<Json<MockConfig>, StatusCode> {
870 let mocks = state.mocks.read().await;
871 mocks
872 .iter()
873 .find(|m| m.id == id)
874 .cloned()
875 .map(Json)
876 .ok_or(StatusCode::NOT_FOUND)
877}
878
879async fn create_mock(
881 State(state): State<ManagementState>,
882 Json(mut mock): Json<MockConfig>,
883) -> Result<Json<MockConfig>, StatusCode> {
884 let mut mocks = state.mocks.write().await;
885
886 if mock.id.is_empty() {
888 mock.id = uuid::Uuid::new_v4().to_string();
889 }
890
891 if mocks.iter().any(|m| m.id == mock.id) {
893 return Err(StatusCode::CONFLICT);
894 }
895
896 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
897
898 if let Some(hooks) = &state.lifecycle_hooks {
900 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
901 id: mock.id.clone(),
902 name: mock.name.clone(),
903 config: serde_json::to_value(&mock).unwrap_or_default(),
904 };
905 hooks.invoke_mock_created(&event).await;
906 }
907
908 mocks.push(mock.clone());
909
910 if let Some(tx) = &state.ws_broadcast {
912 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
913 }
914
915 Ok(Json(mock))
916}
917
918async fn update_mock(
920 State(state): State<ManagementState>,
921 Path(id): Path<String>,
922 Json(updated_mock): Json<MockConfig>,
923) -> Result<Json<MockConfig>, StatusCode> {
924 let mut mocks = state.mocks.write().await;
925
926 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
927
928 let old_mock = mocks[position].clone();
930
931 info!("Updating mock: {}", id);
932 mocks[position] = updated_mock.clone();
933
934 if let Some(hooks) = &state.lifecycle_hooks {
936 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
937 id: updated_mock.id.clone(),
938 name: updated_mock.name.clone(),
939 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
940 };
941 hooks.invoke_mock_updated(&event).await;
942
943 if old_mock.enabled != updated_mock.enabled {
945 let state_event = if updated_mock.enabled {
946 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
947 id: updated_mock.id.clone(),
948 }
949 } else {
950 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
951 id: updated_mock.id.clone(),
952 }
953 };
954 hooks.invoke_mock_state_changed(&state_event).await;
955 }
956 }
957
958 if let Some(tx) = &state.ws_broadcast {
960 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
961 }
962
963 Ok(Json(updated_mock))
964}
965
966async fn delete_mock(
968 State(state): State<ManagementState>,
969 Path(id): Path<String>,
970) -> Result<StatusCode, StatusCode> {
971 let mut mocks = state.mocks.write().await;
972
973 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
974
975 let deleted_mock = mocks[position].clone();
977
978 info!("Deleting mock: {}", id);
979 mocks.remove(position);
980
981 if let Some(hooks) = &state.lifecycle_hooks {
983 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
984 id: deleted_mock.id.clone(),
985 name: deleted_mock.name.clone(),
986 };
987 hooks.invoke_mock_deleted(&event).await;
988 }
989
990 if let Some(tx) = &state.ws_broadcast {
992 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
993 }
994
995 Ok(StatusCode::NO_CONTENT)
996}
997
998#[derive(Debug, Deserialize)]
1000pub struct ValidateConfigRequest {
1001 pub config: serde_json::Value,
1003 #[serde(default = "default_format")]
1005 pub format: String,
1006}
1007
1008fn default_format() -> String {
1009 "json".to_string()
1010}
1011
1012async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1014 use mockforge_core::config::ServerConfig;
1015
1016 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1017 "yaml" | "yml" => {
1018 let yaml_str = match serde_json::to_string(&request.config) {
1019 Ok(s) => s,
1020 Err(e) => {
1021 return (
1022 StatusCode::BAD_REQUEST,
1023 Json(serde_json::json!({
1024 "valid": false,
1025 "error": format!("Failed to convert to string: {}", e),
1026 "message": "Configuration validation failed"
1027 })),
1028 )
1029 .into_response();
1030 }
1031 };
1032 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1033 }
1034 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1035 };
1036
1037 match config_result {
1038 Ok(_) => Json(serde_json::json!({
1039 "valid": true,
1040 "message": "Configuration is valid"
1041 }))
1042 .into_response(),
1043 Err(e) => (
1044 StatusCode::BAD_REQUEST,
1045 Json(serde_json::json!({
1046 "valid": false,
1047 "error": format!("Invalid configuration: {}", e),
1048 "message": "Configuration validation failed"
1049 })),
1050 )
1051 .into_response(),
1052 }
1053}
1054
1055#[derive(Debug, Deserialize)]
1057pub struct BulkConfigUpdateRequest {
1058 pub updates: serde_json::Value,
1060}
1061
1062async fn bulk_update_config(
1070 State(state): State<ManagementState>,
1071 Json(request): Json<BulkConfigUpdateRequest>,
1072) -> impl IntoResponse {
1073 if !request.updates.is_object() {
1075 return (
1076 StatusCode::BAD_REQUEST,
1077 Json(serde_json::json!({
1078 "error": "Invalid request",
1079 "message": "Updates must be a JSON object"
1080 })),
1081 )
1082 .into_response();
1083 }
1084
1085 use mockforge_core::config::ServerConfig;
1087
1088 let base_config = ServerConfig::default();
1090 let base_json = match serde_json::to_value(&base_config) {
1091 Ok(v) => v,
1092 Err(e) => {
1093 return (
1094 StatusCode::INTERNAL_SERVER_ERROR,
1095 Json(serde_json::json!({
1096 "error": "Internal error",
1097 "message": format!("Failed to serialize base config: {}", e)
1098 })),
1099 )
1100 .into_response();
1101 }
1102 };
1103
1104 let mut merged = base_json.clone();
1106 if let (Some(merged_obj), Some(updates_obj)) =
1107 (merged.as_object_mut(), request.updates.as_object())
1108 {
1109 for (key, value) in updates_obj {
1110 merged_obj.insert(key.clone(), value.clone());
1111 }
1112 }
1113
1114 match serde_json::from_value::<ServerConfig>(merged) {
1116 Ok(validated_config) => {
1117 if let Some(ref config_lock) = state.server_config {
1119 let mut config = config_lock.write().await;
1120 *config = validated_config;
1121 Json(serde_json::json!({
1122 "success": true,
1123 "message": "Bulk configuration update applied successfully",
1124 "updates_received": request.updates,
1125 "validated": true,
1126 "applied": true
1127 }))
1128 .into_response()
1129 } else {
1130 Json(serde_json::json!({
1131 "success": true,
1132 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1133 "updates_received": request.updates,
1134 "validated": true,
1135 "applied": false
1136 }))
1137 .into_response()
1138 }
1139 }
1140 Err(e) => (
1141 StatusCode::BAD_REQUEST,
1142 Json(serde_json::json!({
1143 "error": "Invalid configuration",
1144 "message": format!("Configuration validation failed: {}", e),
1145 "validated": false
1146 })),
1147 )
1148 .into_response(),
1149 }
1150}
1151
1152async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1154 let mocks = state.mocks.read().await;
1155 let request_count = *state.request_counter.read().await;
1156
1157 Json(ServerStats {
1158 uptime_seconds: state.start_time.elapsed().as_secs(),
1159 total_requests: request_count,
1160 active_mocks: mocks.len(),
1161 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1162 registered_routes: mocks.len(), })
1164}
1165
1166async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1168 Json(ServerConfig {
1169 version: env!("CARGO_PKG_VERSION").to_string(),
1170 port: state.port,
1171 has_openapi_spec: state.spec.is_some(),
1172 spec_path: state.spec_path.clone(),
1173 })
1174}
1175
1176async fn health_check() -> Json<serde_json::Value> {
1178 Json(serde_json::json!({
1179 "status": "healthy",
1180 "service": "mockforge-management",
1181 "timestamp": chrono::Utc::now().to_rfc3339()
1182 }))
1183}
1184
1185#[derive(Debug, Clone, Serialize, Deserialize)]
1187#[serde(rename_all = "lowercase")]
1188pub enum ExportFormat {
1189 Json,
1191 Yaml,
1193}
1194
1195async fn export_mocks(
1197 State(state): State<ManagementState>,
1198 Query(params): Query<std::collections::HashMap<String, String>>,
1199) -> Result<(StatusCode, String), StatusCode> {
1200 let mocks = state.mocks.read().await;
1201
1202 let format = params
1203 .get("format")
1204 .map(|f| match f.as_str() {
1205 "yaml" | "yml" => ExportFormat::Yaml,
1206 _ => ExportFormat::Json,
1207 })
1208 .unwrap_or(ExportFormat::Json);
1209
1210 match format {
1211 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1212 .map(|json| (StatusCode::OK, json))
1213 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1214 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1215 .map(|yaml| (StatusCode::OK, yaml))
1216 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1217 }
1218}
1219
1220async fn import_mocks(
1222 State(state): State<ManagementState>,
1223 Json(mocks): Json<Vec<MockConfig>>,
1224) -> impl IntoResponse {
1225 let mut current_mocks = state.mocks.write().await;
1226 current_mocks.clear();
1227 current_mocks.extend(mocks);
1228 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1229}
1230
1231#[cfg(feature = "smtp")]
1232async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1234 if let Some(ref smtp_registry) = state.smtp_registry {
1235 match smtp_registry.get_emails() {
1236 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1237 Err(e) => (
1238 StatusCode::INTERNAL_SERVER_ERROR,
1239 Json(serde_json::json!({
1240 "error": "Failed to retrieve emails",
1241 "message": e.to_string()
1242 })),
1243 ),
1244 }
1245 } else {
1246 (
1247 StatusCode::NOT_IMPLEMENTED,
1248 Json(serde_json::json!({
1249 "error": "SMTP mailbox management not available",
1250 "message": "SMTP server is not enabled or registry not available."
1251 })),
1252 )
1253 }
1254}
1255
1256#[cfg(feature = "smtp")]
1258async fn get_smtp_email(
1259 State(state): State<ManagementState>,
1260 Path(id): Path<String>,
1261) -> impl IntoResponse {
1262 if let Some(ref smtp_registry) = state.smtp_registry {
1263 match smtp_registry.get_email_by_id(&id) {
1264 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1265 Ok(None) => (
1266 StatusCode::NOT_FOUND,
1267 Json(serde_json::json!({
1268 "error": "Email not found",
1269 "id": id
1270 })),
1271 ),
1272 Err(e) => (
1273 StatusCode::INTERNAL_SERVER_ERROR,
1274 Json(serde_json::json!({
1275 "error": "Failed to retrieve email",
1276 "message": e.to_string()
1277 })),
1278 ),
1279 }
1280 } else {
1281 (
1282 StatusCode::NOT_IMPLEMENTED,
1283 Json(serde_json::json!({
1284 "error": "SMTP mailbox management not available",
1285 "message": "SMTP server is not enabled or registry not available."
1286 })),
1287 )
1288 }
1289}
1290
1291#[cfg(feature = "smtp")]
1293async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1294 if let Some(ref smtp_registry) = state.smtp_registry {
1295 match smtp_registry.clear_mailbox() {
1296 Ok(()) => (
1297 StatusCode::OK,
1298 Json(serde_json::json!({
1299 "message": "Mailbox cleared successfully"
1300 })),
1301 ),
1302 Err(e) => (
1303 StatusCode::INTERNAL_SERVER_ERROR,
1304 Json(serde_json::json!({
1305 "error": "Failed to clear mailbox",
1306 "message": e.to_string()
1307 })),
1308 ),
1309 }
1310 } else {
1311 (
1312 StatusCode::NOT_IMPLEMENTED,
1313 Json(serde_json::json!({
1314 "error": "SMTP mailbox management not available",
1315 "message": "SMTP server is not enabled or registry not available."
1316 })),
1317 )
1318 }
1319}
1320
1321#[cfg(feature = "smtp")]
1323async fn export_smtp_mailbox(
1324 Query(params): Query<std::collections::HashMap<String, String>>,
1325) -> impl IntoResponse {
1326 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1327 (
1328 StatusCode::NOT_IMPLEMENTED,
1329 Json(serde_json::json!({
1330 "error": "SMTP mailbox management not available via HTTP API",
1331 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1332 "requested_format": format
1333 })),
1334 )
1335}
1336
1337#[cfg(feature = "smtp")]
1339async fn search_smtp_emails(
1340 State(state): State<ManagementState>,
1341 Query(params): Query<std::collections::HashMap<String, String>>,
1342) -> impl IntoResponse {
1343 if let Some(ref smtp_registry) = state.smtp_registry {
1344 let filters = EmailSearchFilters {
1345 sender: params.get("sender").cloned(),
1346 recipient: params.get("recipient").cloned(),
1347 subject: params.get("subject").cloned(),
1348 body: params.get("body").cloned(),
1349 since: params
1350 .get("since")
1351 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1352 .map(|dt| dt.with_timezone(&chrono::Utc)),
1353 until: params
1354 .get("until")
1355 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1356 .map(|dt| dt.with_timezone(&chrono::Utc)),
1357 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1358 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1359 };
1360
1361 match smtp_registry.search_emails(filters) {
1362 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1363 Err(e) => (
1364 StatusCode::INTERNAL_SERVER_ERROR,
1365 Json(serde_json::json!({
1366 "error": "Failed to search emails",
1367 "message": e.to_string()
1368 })),
1369 ),
1370 }
1371 } else {
1372 (
1373 StatusCode::NOT_IMPLEMENTED,
1374 Json(serde_json::json!({
1375 "error": "SMTP mailbox management not available",
1376 "message": "SMTP server is not enabled or registry not available."
1377 })),
1378 )
1379 }
1380}
1381
1382#[cfg(feature = "mqtt")]
1384#[derive(Debug, Clone, Serialize, Deserialize)]
1385pub struct MqttBrokerStats {
1386 pub connected_clients: usize,
1388 pub active_topics: usize,
1390 pub retained_messages: usize,
1392 pub total_subscriptions: usize,
1394}
1395
1396#[cfg(feature = "mqtt")]
1398async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1399 if let Some(broker) = &state.mqtt_broker {
1400 let connected_clients = broker.get_connected_clients().await.len();
1401 let active_topics = broker.get_active_topics().await.len();
1402 let stats = broker.get_topic_stats().await;
1403
1404 let broker_stats = MqttBrokerStats {
1405 connected_clients,
1406 active_topics,
1407 retained_messages: stats.retained_messages,
1408 total_subscriptions: stats.total_subscriptions,
1409 };
1410
1411 Json(broker_stats).into_response()
1412 } else {
1413 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1414 }
1415}
1416
1417#[cfg(feature = "mqtt")]
1418async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1419 if let Some(broker) = &state.mqtt_broker {
1420 let clients = broker.get_connected_clients().await;
1421 Json(serde_json::json!({
1422 "clients": clients
1423 }))
1424 .into_response()
1425 } else {
1426 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1427 }
1428}
1429
1430#[cfg(feature = "mqtt")]
1431async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1432 if let Some(broker) = &state.mqtt_broker {
1433 let topics = broker.get_active_topics().await;
1434 Json(serde_json::json!({
1435 "topics": topics
1436 }))
1437 .into_response()
1438 } else {
1439 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1440 }
1441}
1442
1443#[cfg(feature = "mqtt")]
1444async fn disconnect_mqtt_client(
1445 State(state): State<ManagementState>,
1446 Path(client_id): Path<String>,
1447) -> impl IntoResponse {
1448 if let Some(broker) = &state.mqtt_broker {
1449 match broker.disconnect_client(&client_id).await {
1450 Ok(_) => {
1451 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1452 }
1453 Err(e) => {
1454 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1455 .into_response()
1456 }
1457 }
1458 } else {
1459 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1460 }
1461}
1462
1463#[cfg(feature = "mqtt")]
1466#[derive(Debug, Deserialize)]
1468pub struct MqttPublishRequest {
1469 pub topic: String,
1471 pub payload: String,
1473 #[serde(default = "default_qos")]
1475 pub qos: u8,
1476 #[serde(default)]
1478 pub retain: bool,
1479}
1480
1481#[cfg(feature = "mqtt")]
1482fn default_qos() -> u8 {
1483 0
1484}
1485
1486#[cfg(feature = "mqtt")]
1487async fn publish_mqtt_message_handler(
1489 State(state): State<ManagementState>,
1490 Json(request): Json<serde_json::Value>,
1491) -> impl IntoResponse {
1492 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1494 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1495 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1496 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1497
1498 if topic.is_none() || payload.is_none() {
1499 return (
1500 StatusCode::BAD_REQUEST,
1501 Json(serde_json::json!({
1502 "error": "Invalid request",
1503 "message": "Missing required fields: topic and payload"
1504 })),
1505 );
1506 }
1507
1508 let topic = topic.unwrap();
1509 let payload = payload.unwrap();
1510
1511 if let Some(broker) = &state.mqtt_broker {
1512 if qos > 2 {
1514 return (
1515 StatusCode::BAD_REQUEST,
1516 Json(serde_json::json!({
1517 "error": "Invalid QoS",
1518 "message": "QoS must be 0, 1, or 2"
1519 })),
1520 );
1521 }
1522
1523 let payload_bytes = payload.as_bytes().to_vec();
1525 let client_id = "mockforge-management-api".to_string();
1526
1527 let publish_result = broker
1528 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1529 .await
1530 .map_err(|e| format!("{}", e));
1531
1532 match publish_result {
1533 Ok(_) => {
1534 let event = MessageEvent::Mqtt(MqttMessageEvent {
1536 topic: topic.clone(),
1537 payload: payload.clone(),
1538 qos,
1539 retain,
1540 timestamp: chrono::Utc::now().to_rfc3339(),
1541 });
1542 let _ = state.message_events.send(event);
1543
1544 (
1545 StatusCode::OK,
1546 Json(serde_json::json!({
1547 "success": true,
1548 "message": format!("Message published to topic '{}'", topic),
1549 "topic": topic,
1550 "qos": qos,
1551 "retain": retain
1552 })),
1553 )
1554 }
1555 Err(error_msg) => (
1556 StatusCode::INTERNAL_SERVER_ERROR,
1557 Json(serde_json::json!({
1558 "error": "Failed to publish message",
1559 "message": error_msg
1560 })),
1561 ),
1562 }
1563 } else {
1564 (
1565 StatusCode::SERVICE_UNAVAILABLE,
1566 Json(serde_json::json!({
1567 "error": "MQTT broker not available",
1568 "message": "MQTT broker is not enabled or not available."
1569 })),
1570 )
1571 }
1572}
1573
1574#[cfg(not(feature = "mqtt"))]
1575async fn publish_mqtt_message_handler(
1577 State(_state): State<ManagementState>,
1578 Json(_request): Json<serde_json::Value>,
1579) -> impl IntoResponse {
1580 (
1581 StatusCode::SERVICE_UNAVAILABLE,
1582 Json(serde_json::json!({
1583 "error": "MQTT feature not enabled",
1584 "message": "MQTT support is not compiled into this build"
1585 })),
1586 )
1587}
1588
1589#[cfg(feature = "mqtt")]
1590#[derive(Debug, Deserialize)]
1592pub struct MqttBatchPublishRequest {
1593 pub messages: Vec<MqttPublishRequest>,
1595 #[serde(default = "default_delay")]
1597 pub delay_ms: u64,
1598}
1599
1600#[cfg(feature = "mqtt")]
1601fn default_delay() -> u64 {
1602 100
1603}
1604
1605#[cfg(feature = "mqtt")]
1606async fn publish_mqtt_batch_handler(
1608 State(state): State<ManagementState>,
1609 Json(request): Json<serde_json::Value>,
1610) -> impl IntoResponse {
1611 let messages_json = request.get("messages").and_then(|v| v.as_array());
1613 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1614
1615 if messages_json.is_none() {
1616 return (
1617 StatusCode::BAD_REQUEST,
1618 Json(serde_json::json!({
1619 "error": "Invalid request",
1620 "message": "Missing required field: messages"
1621 })),
1622 );
1623 }
1624
1625 let messages_json = messages_json.unwrap();
1626
1627 if let Some(broker) = &state.mqtt_broker {
1628 if messages_json.is_empty() {
1629 return (
1630 StatusCode::BAD_REQUEST,
1631 Json(serde_json::json!({
1632 "error": "Empty batch",
1633 "message": "At least one message is required"
1634 })),
1635 );
1636 }
1637
1638 let mut results = Vec::new();
1639 let client_id = "mockforge-management-api".to_string();
1640
1641 for (index, msg_json) in messages_json.iter().enumerate() {
1642 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1643 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1644 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1645 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1646
1647 if topic.is_none() || payload.is_none() {
1648 results.push(serde_json::json!({
1649 "index": index,
1650 "success": false,
1651 "error": "Missing required fields: topic and payload"
1652 }));
1653 continue;
1654 }
1655
1656 let topic = topic.unwrap();
1657 let payload = payload.unwrap();
1658
1659 if qos > 2 {
1661 results.push(serde_json::json!({
1662 "index": index,
1663 "success": false,
1664 "error": "Invalid QoS (must be 0, 1, or 2)"
1665 }));
1666 continue;
1667 }
1668
1669 let payload_bytes = payload.as_bytes().to_vec();
1671
1672 let publish_result = broker
1673 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1674 .await
1675 .map_err(|e| format!("{}", e));
1676
1677 match publish_result {
1678 Ok(_) => {
1679 let event = MessageEvent::Mqtt(MqttMessageEvent {
1681 topic: topic.clone(),
1682 payload: payload.clone(),
1683 qos,
1684 retain,
1685 timestamp: chrono::Utc::now().to_rfc3339(),
1686 });
1687 let _ = state.message_events.send(event);
1688
1689 results.push(serde_json::json!({
1690 "index": index,
1691 "success": true,
1692 "topic": topic,
1693 "qos": qos
1694 }));
1695 }
1696 Err(error_msg) => {
1697 results.push(serde_json::json!({
1698 "index": index,
1699 "success": false,
1700 "error": error_msg
1701 }));
1702 }
1703 }
1704
1705 if index < messages_json.len() - 1 && delay_ms > 0 {
1707 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1708 }
1709 }
1710
1711 let success_count =
1712 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1713
1714 (
1715 StatusCode::OK,
1716 Json(serde_json::json!({
1717 "success": true,
1718 "total": messages_json.len(),
1719 "succeeded": success_count,
1720 "failed": messages_json.len() - success_count,
1721 "results": results
1722 })),
1723 )
1724 } else {
1725 (
1726 StatusCode::SERVICE_UNAVAILABLE,
1727 Json(serde_json::json!({
1728 "error": "MQTT broker not available",
1729 "message": "MQTT broker is not enabled or not available."
1730 })),
1731 )
1732 }
1733}
1734
1735#[cfg(not(feature = "mqtt"))]
1736async fn publish_mqtt_batch_handler(
1738 State(_state): State<ManagementState>,
1739 Json(_request): Json<serde_json::Value>,
1740) -> impl IntoResponse {
1741 (
1742 StatusCode::SERVICE_UNAVAILABLE,
1743 Json(serde_json::json!({
1744 "error": "MQTT feature not enabled",
1745 "message": "MQTT support is not compiled into this build"
1746 })),
1747 )
1748}
1749
1750#[derive(Debug, Deserialize)]
1754struct SetMigrationModeRequest {
1755 mode: String,
1756}
1757
1758async fn get_migration_routes(
1760 State(state): State<ManagementState>,
1761) -> Result<Json<serde_json::Value>, StatusCode> {
1762 let proxy_config = match &state.proxy_config {
1763 Some(config) => config,
1764 None => {
1765 return Ok(Json(serde_json::json!({
1766 "error": "Migration not configured. Proxy config not available."
1767 })));
1768 }
1769 };
1770
1771 let config = proxy_config.read().await;
1772 let routes = config.get_migration_routes();
1773
1774 Ok(Json(serde_json::json!({
1775 "routes": routes
1776 })))
1777}
1778
1779async fn toggle_route_migration(
1781 State(state): State<ManagementState>,
1782 Path(pattern): Path<String>,
1783) -> Result<Json<serde_json::Value>, StatusCode> {
1784 let proxy_config = match &state.proxy_config {
1785 Some(config) => config,
1786 None => {
1787 return Ok(Json(serde_json::json!({
1788 "error": "Migration not configured. Proxy config not available."
1789 })));
1790 }
1791 };
1792
1793 let mut config = proxy_config.write().await;
1794 let new_mode = match config.toggle_route_migration(&pattern) {
1795 Some(mode) => mode,
1796 None => {
1797 return Ok(Json(serde_json::json!({
1798 "error": format!("Route pattern not found: {}", pattern)
1799 })));
1800 }
1801 };
1802
1803 Ok(Json(serde_json::json!({
1804 "pattern": pattern,
1805 "mode": format!("{:?}", new_mode).to_lowercase()
1806 })))
1807}
1808
1809async fn set_route_migration_mode(
1811 State(state): State<ManagementState>,
1812 Path(pattern): Path<String>,
1813 Json(request): Json<SetMigrationModeRequest>,
1814) -> Result<Json<serde_json::Value>, StatusCode> {
1815 let proxy_config = match &state.proxy_config {
1816 Some(config) => config,
1817 None => {
1818 return Ok(Json(serde_json::json!({
1819 "error": "Migration not configured. Proxy config not available."
1820 })));
1821 }
1822 };
1823
1824 use mockforge_core::proxy::config::MigrationMode;
1825 let mode = match request.mode.to_lowercase().as_str() {
1826 "mock" => MigrationMode::Mock,
1827 "shadow" => MigrationMode::Shadow,
1828 "real" => MigrationMode::Real,
1829 "auto" => MigrationMode::Auto,
1830 _ => {
1831 return Ok(Json(serde_json::json!({
1832 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1833 })));
1834 }
1835 };
1836
1837 let mut config = proxy_config.write().await;
1838 let updated = config.update_rule_migration_mode(&pattern, mode);
1839
1840 if !updated {
1841 return Ok(Json(serde_json::json!({
1842 "error": format!("Route pattern not found: {}", pattern)
1843 })));
1844 }
1845
1846 Ok(Json(serde_json::json!({
1847 "pattern": pattern,
1848 "mode": format!("{:?}", mode).to_lowercase()
1849 })))
1850}
1851
1852async fn toggle_group_migration(
1854 State(state): State<ManagementState>,
1855 Path(group): Path<String>,
1856) -> Result<Json<serde_json::Value>, StatusCode> {
1857 let proxy_config = match &state.proxy_config {
1858 Some(config) => config,
1859 None => {
1860 return Ok(Json(serde_json::json!({
1861 "error": "Migration not configured. Proxy config not available."
1862 })));
1863 }
1864 };
1865
1866 let mut config = proxy_config.write().await;
1867 let new_mode = config.toggle_group_migration(&group);
1868
1869 Ok(Json(serde_json::json!({
1870 "group": group,
1871 "mode": format!("{:?}", new_mode).to_lowercase()
1872 })))
1873}
1874
1875async fn set_group_migration_mode(
1877 State(state): State<ManagementState>,
1878 Path(group): Path<String>,
1879 Json(request): Json<SetMigrationModeRequest>,
1880) -> Result<Json<serde_json::Value>, StatusCode> {
1881 let proxy_config = match &state.proxy_config {
1882 Some(config) => config,
1883 None => {
1884 return Ok(Json(serde_json::json!({
1885 "error": "Migration not configured. Proxy config not available."
1886 })));
1887 }
1888 };
1889
1890 use mockforge_core::proxy::config::MigrationMode;
1891 let mode = match request.mode.to_lowercase().as_str() {
1892 "mock" => MigrationMode::Mock,
1893 "shadow" => MigrationMode::Shadow,
1894 "real" => MigrationMode::Real,
1895 "auto" => MigrationMode::Auto,
1896 _ => {
1897 return Ok(Json(serde_json::json!({
1898 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1899 })));
1900 }
1901 };
1902
1903 let mut config = proxy_config.write().await;
1904 config.update_group_migration_mode(&group, mode);
1905
1906 Ok(Json(serde_json::json!({
1907 "group": group,
1908 "mode": format!("{:?}", mode).to_lowercase()
1909 })))
1910}
1911
1912async fn get_migration_groups(
1914 State(state): State<ManagementState>,
1915) -> Result<Json<serde_json::Value>, StatusCode> {
1916 let proxy_config = match &state.proxy_config {
1917 Some(config) => config,
1918 None => {
1919 return Ok(Json(serde_json::json!({
1920 "error": "Migration not configured. Proxy config not available."
1921 })));
1922 }
1923 };
1924
1925 let config = proxy_config.read().await;
1926 let groups = config.get_migration_groups();
1927
1928 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1930 .into_iter()
1931 .map(|(name, info)| {
1932 (
1933 name,
1934 serde_json::json!({
1935 "name": info.name,
1936 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1937 "route_count": info.route_count
1938 }),
1939 )
1940 })
1941 .collect();
1942
1943 Ok(Json(serde_json::json!(groups_json)))
1944}
1945
1946async fn get_migration_status(
1948 State(state): State<ManagementState>,
1949) -> Result<Json<serde_json::Value>, StatusCode> {
1950 let proxy_config = match &state.proxy_config {
1951 Some(config) => config,
1952 None => {
1953 return Ok(Json(serde_json::json!({
1954 "error": "Migration not configured. Proxy config not available."
1955 })));
1956 }
1957 };
1958
1959 let config = proxy_config.read().await;
1960 let routes = config.get_migration_routes();
1961 let groups = config.get_migration_groups();
1962
1963 let mut mock_count = 0;
1964 let mut shadow_count = 0;
1965 let mut real_count = 0;
1966 let mut auto_count = 0;
1967
1968 for route in &routes {
1969 match route.migration_mode {
1970 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1971 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1972 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1973 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1974 }
1975 }
1976
1977 Ok(Json(serde_json::json!({
1978 "total_routes": routes.len(),
1979 "mock_routes": mock_count,
1980 "shadow_routes": shadow_count,
1981 "real_routes": real_count,
1982 "auto_routes": auto_count,
1983 "total_groups": groups.len(),
1984 "migration_enabled": config.migration_enabled
1985 })))
1986}
1987
1988#[derive(Debug, Deserialize, Serialize)]
1992pub struct ProxyRuleRequest {
1993 pub pattern: String,
1995 #[serde(rename = "type")]
1997 pub rule_type: String,
1998 #[serde(default)]
2000 pub status_codes: Vec<u16>,
2001 pub body_transforms: Vec<BodyTransformRequest>,
2003 #[serde(default = "default_true")]
2005 pub enabled: bool,
2006}
2007
2008#[derive(Debug, Deserialize, Serialize)]
2010pub struct BodyTransformRequest {
2011 pub path: String,
2013 pub replace: String,
2015 #[serde(default)]
2017 pub operation: String,
2018}
2019
2020#[derive(Debug, Serialize)]
2022pub struct ProxyRuleResponse {
2023 pub id: usize,
2025 pub pattern: String,
2027 #[serde(rename = "type")]
2029 pub rule_type: String,
2030 pub status_codes: Vec<u16>,
2032 pub body_transforms: Vec<BodyTransformRequest>,
2034 pub enabled: bool,
2036}
2037
2038async fn list_proxy_rules(
2040 State(state): State<ManagementState>,
2041) -> Result<Json<serde_json::Value>, StatusCode> {
2042 let proxy_config = match &state.proxy_config {
2043 Some(config) => config,
2044 None => {
2045 return Ok(Json(serde_json::json!({
2046 "error": "Proxy not configured. Proxy config not available."
2047 })));
2048 }
2049 };
2050
2051 let config = proxy_config.read().await;
2052
2053 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2054
2055 for (idx, rule) in config.request_replacements.iter().enumerate() {
2057 rules.push(ProxyRuleResponse {
2058 id: idx,
2059 pattern: rule.pattern.clone(),
2060 rule_type: "request".to_string(),
2061 status_codes: Vec::new(),
2062 body_transforms: rule
2063 .body_transforms
2064 .iter()
2065 .map(|t| BodyTransformRequest {
2066 path: t.path.clone(),
2067 replace: t.replace.clone(),
2068 operation: format!("{:?}", t.operation).to_lowercase(),
2069 })
2070 .collect(),
2071 enabled: rule.enabled,
2072 });
2073 }
2074
2075 let request_count = config.request_replacements.len();
2077 for (idx, rule) in config.response_replacements.iter().enumerate() {
2078 rules.push(ProxyRuleResponse {
2079 id: request_count + idx,
2080 pattern: rule.pattern.clone(),
2081 rule_type: "response".to_string(),
2082 status_codes: rule.status_codes.clone(),
2083 body_transforms: rule
2084 .body_transforms
2085 .iter()
2086 .map(|t| BodyTransformRequest {
2087 path: t.path.clone(),
2088 replace: t.replace.clone(),
2089 operation: format!("{:?}", t.operation).to_lowercase(),
2090 })
2091 .collect(),
2092 enabled: rule.enabled,
2093 });
2094 }
2095
2096 Ok(Json(serde_json::json!({
2097 "rules": rules
2098 })))
2099}
2100
2101async fn create_proxy_rule(
2103 State(state): State<ManagementState>,
2104 Json(request): Json<ProxyRuleRequest>,
2105) -> Result<Json<serde_json::Value>, StatusCode> {
2106 let proxy_config = match &state.proxy_config {
2107 Some(config) => config,
2108 None => {
2109 return Ok(Json(serde_json::json!({
2110 "error": "Proxy not configured. Proxy config not available."
2111 })));
2112 }
2113 };
2114
2115 if request.body_transforms.is_empty() {
2117 return Ok(Json(serde_json::json!({
2118 "error": "At least one body transform is required"
2119 })));
2120 }
2121
2122 let body_transforms: Vec<BodyTransform> = request
2123 .body_transforms
2124 .iter()
2125 .map(|t| {
2126 let op = match t.operation.as_str() {
2127 "replace" => TransformOperation::Replace,
2128 "add" => TransformOperation::Add,
2129 "remove" => TransformOperation::Remove,
2130 _ => TransformOperation::Replace,
2131 };
2132 BodyTransform {
2133 path: t.path.clone(),
2134 replace: t.replace.clone(),
2135 operation: op,
2136 }
2137 })
2138 .collect();
2139
2140 let new_rule = BodyTransformRule {
2141 pattern: request.pattern.clone(),
2142 status_codes: request.status_codes.clone(),
2143 body_transforms,
2144 enabled: request.enabled,
2145 };
2146
2147 let mut config = proxy_config.write().await;
2148
2149 let rule_id = if request.rule_type == "request" {
2150 config.request_replacements.push(new_rule);
2151 config.request_replacements.len() - 1
2152 } else if request.rule_type == "response" {
2153 config.response_replacements.push(new_rule);
2154 config.request_replacements.len() + config.response_replacements.len() - 1
2155 } else {
2156 return Ok(Json(serde_json::json!({
2157 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2158 })));
2159 };
2160
2161 Ok(Json(serde_json::json!({
2162 "id": rule_id,
2163 "message": "Rule created successfully"
2164 })))
2165}
2166
2167async fn get_proxy_rule(
2169 State(state): State<ManagementState>,
2170 Path(id): Path<String>,
2171) -> Result<Json<serde_json::Value>, StatusCode> {
2172 let proxy_config = match &state.proxy_config {
2173 Some(config) => config,
2174 None => {
2175 return Ok(Json(serde_json::json!({
2176 "error": "Proxy not configured. Proxy config not available."
2177 })));
2178 }
2179 };
2180
2181 let config = proxy_config.read().await;
2182 let rule_id: usize = match id.parse() {
2183 Ok(id) => id,
2184 Err(_) => {
2185 return Ok(Json(serde_json::json!({
2186 "error": format!("Invalid rule ID: {}", id)
2187 })));
2188 }
2189 };
2190
2191 let request_count = config.request_replacements.len();
2192
2193 if rule_id < request_count {
2194 let rule = &config.request_replacements[rule_id];
2196 Ok(Json(serde_json::json!({
2197 "id": rule_id,
2198 "pattern": rule.pattern,
2199 "type": "request",
2200 "status_codes": [],
2201 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2202 "path": t.path,
2203 "replace": t.replace,
2204 "operation": format!("{:?}", t.operation).to_lowercase()
2205 })).collect::<Vec<_>>(),
2206 "enabled": rule.enabled
2207 })))
2208 } else if rule_id < request_count + config.response_replacements.len() {
2209 let response_idx = rule_id - request_count;
2211 let rule = &config.response_replacements[response_idx];
2212 Ok(Json(serde_json::json!({
2213 "id": rule_id,
2214 "pattern": rule.pattern,
2215 "type": "response",
2216 "status_codes": rule.status_codes,
2217 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2218 "path": t.path,
2219 "replace": t.replace,
2220 "operation": format!("{:?}", t.operation).to_lowercase()
2221 })).collect::<Vec<_>>(),
2222 "enabled": rule.enabled
2223 })))
2224 } else {
2225 Ok(Json(serde_json::json!({
2226 "error": format!("Rule ID {} not found", rule_id)
2227 })))
2228 }
2229}
2230
2231async fn update_proxy_rule(
2233 State(state): State<ManagementState>,
2234 Path(id): Path<String>,
2235 Json(request): Json<ProxyRuleRequest>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237 let proxy_config = match &state.proxy_config {
2238 Some(config) => config,
2239 None => {
2240 return Ok(Json(serde_json::json!({
2241 "error": "Proxy not configured. Proxy config not available."
2242 })));
2243 }
2244 };
2245
2246 let mut config = proxy_config.write().await;
2247 let rule_id: usize = match id.parse() {
2248 Ok(id) => id,
2249 Err(_) => {
2250 return Ok(Json(serde_json::json!({
2251 "error": format!("Invalid rule ID: {}", id)
2252 })));
2253 }
2254 };
2255
2256 let body_transforms: Vec<BodyTransform> = request
2257 .body_transforms
2258 .iter()
2259 .map(|t| {
2260 let op = match t.operation.as_str() {
2261 "replace" => TransformOperation::Replace,
2262 "add" => TransformOperation::Add,
2263 "remove" => TransformOperation::Remove,
2264 _ => TransformOperation::Replace,
2265 };
2266 BodyTransform {
2267 path: t.path.clone(),
2268 replace: t.replace.clone(),
2269 operation: op,
2270 }
2271 })
2272 .collect();
2273
2274 let updated_rule = BodyTransformRule {
2275 pattern: request.pattern.clone(),
2276 status_codes: request.status_codes.clone(),
2277 body_transforms,
2278 enabled: request.enabled,
2279 };
2280
2281 let request_count = config.request_replacements.len();
2282
2283 if rule_id < request_count {
2284 config.request_replacements[rule_id] = updated_rule;
2286 } else if rule_id < request_count + config.response_replacements.len() {
2287 let response_idx = rule_id - request_count;
2289 config.response_replacements[response_idx] = updated_rule;
2290 } else {
2291 return Ok(Json(serde_json::json!({
2292 "error": format!("Rule ID {} not found", rule_id)
2293 })));
2294 }
2295
2296 Ok(Json(serde_json::json!({
2297 "id": rule_id,
2298 "message": "Rule updated successfully"
2299 })))
2300}
2301
2302async fn delete_proxy_rule(
2304 State(state): State<ManagementState>,
2305 Path(id): Path<String>,
2306) -> Result<Json<serde_json::Value>, StatusCode> {
2307 let proxy_config = match &state.proxy_config {
2308 Some(config) => config,
2309 None => {
2310 return Ok(Json(serde_json::json!({
2311 "error": "Proxy not configured. Proxy config not available."
2312 })));
2313 }
2314 };
2315
2316 let mut config = proxy_config.write().await;
2317 let rule_id: usize = match id.parse() {
2318 Ok(id) => id,
2319 Err(_) => {
2320 return Ok(Json(serde_json::json!({
2321 "error": format!("Invalid rule ID: {}", id)
2322 })));
2323 }
2324 };
2325
2326 let request_count = config.request_replacements.len();
2327
2328 if rule_id < request_count {
2329 config.request_replacements.remove(rule_id);
2331 } else if rule_id < request_count + config.response_replacements.len() {
2332 let response_idx = rule_id - request_count;
2334 config.response_replacements.remove(response_idx);
2335 } else {
2336 return Ok(Json(serde_json::json!({
2337 "error": format!("Rule ID {} not found", rule_id)
2338 })));
2339 }
2340
2341 Ok(Json(serde_json::json!({
2342 "id": rule_id,
2343 "message": "Rule deleted successfully"
2344 })))
2345}
2346
2347async fn get_proxy_inspect(
2349 State(state): State<ManagementState>,
2350 Query(params): Query<std::collections::HashMap<String, String>>,
2351) -> Result<Json<serde_json::Value>, StatusCode> {
2352 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2353 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2354
2355 let proxy_config = match &state.proxy_config {
2356 Some(config) => config.read().await,
2357 None => {
2358 return Ok(Json(serde_json::json!({
2359 "error": "Proxy not configured. Proxy config not available."
2360 })));
2361 }
2362 };
2363
2364 let mut rules = Vec::new();
2365 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2366 rules.push(serde_json::json!({
2367 "id": idx,
2368 "kind": "request",
2369 "pattern": rule.pattern,
2370 "enabled": rule.enabled,
2371 "status_codes": rule.status_codes,
2372 "transform_count": rule.body_transforms.len(),
2373 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2374 "path": t.path,
2375 "operation": t.operation,
2376 "replace": t.replace
2377 })).collect::<Vec<_>>()
2378 }));
2379 }
2380 let request_rule_count = rules.len();
2381 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2382 rules.push(serde_json::json!({
2383 "id": request_rule_count + idx,
2384 "kind": "response",
2385 "pattern": rule.pattern,
2386 "enabled": rule.enabled,
2387 "status_codes": rule.status_codes,
2388 "transform_count": rule.body_transforms.len(),
2389 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2390 "path": t.path,
2391 "operation": t.operation,
2392 "replace": t.replace
2393 })).collect::<Vec<_>>()
2394 }));
2395 }
2396
2397 let total = rules.len();
2398 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2399
2400 Ok(Json(serde_json::json!({
2401 "enabled": proxy_config.enabled,
2402 "target_url": proxy_config.target_url,
2403 "prefix": proxy_config.prefix,
2404 "timeout_seconds": proxy_config.timeout_seconds,
2405 "follow_redirects": proxy_config.follow_redirects,
2406 "passthrough_by_default": proxy_config.passthrough_by_default,
2407 "rules": paged_rules,
2408 "request_rule_count": request_rule_count,
2409 "response_rule_count": total.saturating_sub(request_rule_count),
2410 "limit": limit,
2411 "offset": offset,
2412 "total": total
2413 })))
2414}
2415
2416pub fn management_router(state: ManagementState) -> Router {
2418 let router = Router::new()
2419 .route("/health", get(health_check))
2420 .route("/stats", get(get_stats))
2421 .route("/config", get(get_config))
2422 .route("/config/validate", post(validate_config))
2423 .route("/config/bulk", post(bulk_update_config))
2424 .route("/mocks", get(list_mocks))
2425 .route("/mocks", post(create_mock))
2426 .route("/mocks/{id}", get(get_mock))
2427 .route("/mocks/{id}", put(update_mock))
2428 .route("/mocks/{id}", delete(delete_mock))
2429 .route("/export", get(export_mocks))
2430 .route("/import", post(import_mocks));
2431
2432 #[cfg(feature = "smtp")]
2433 let router = router
2434 .route("/smtp/mailbox", get(list_smtp_emails))
2435 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2436 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2437 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2438 .route("/smtp/mailbox/search", get(search_smtp_emails));
2439
2440 #[cfg(not(feature = "smtp"))]
2441 let router = router;
2442
2443 #[cfg(feature = "mqtt")]
2445 let router = router
2446 .route("/mqtt/stats", get(get_mqtt_stats))
2447 .route("/mqtt/clients", get(get_mqtt_clients))
2448 .route("/mqtt/topics", get(get_mqtt_topics))
2449 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2450 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2451 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2452 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2453
2454 #[cfg(not(feature = "mqtt"))]
2455 let router = router
2456 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2457 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2458
2459 #[cfg(feature = "kafka")]
2460 let router = router
2461 .route("/kafka/stats", get(get_kafka_stats))
2462 .route("/kafka/topics", get(get_kafka_topics))
2463 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2464 .route("/kafka/groups", get(get_kafka_groups))
2465 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2466 .route("/kafka/produce", post(produce_kafka_message))
2467 .route("/kafka/produce/batch", post(produce_kafka_batch))
2468 .route("/kafka/messages/stream", get(kafka_messages_stream));
2469
2470 #[cfg(not(feature = "kafka"))]
2471 let router = router;
2472
2473 let router = router
2475 .route("/migration/routes", get(get_migration_routes))
2476 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2477 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2478 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2479 .route("/migration/groups/{group}", put(set_group_migration_mode))
2480 .route("/migration/groups", get(get_migration_groups))
2481 .route("/migration/status", get(get_migration_status));
2482
2483 let router = router
2485 .route("/proxy/rules", get(list_proxy_rules))
2486 .route("/proxy/rules", post(create_proxy_rule))
2487 .route("/proxy/rules/{id}", get(get_proxy_rule))
2488 .route("/proxy/rules/{id}", put(update_proxy_rule))
2489 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2490 .route("/proxy/inspect", get(get_proxy_inspect));
2491
2492 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2494
2495 let router = router.nest(
2497 "/snapshot-diff",
2498 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2499 );
2500
2501 #[cfg(feature = "behavioral-cloning")]
2502 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2503
2504 let router = router
2505 .route("/mockai/learn", post(learn_from_examples))
2506 .route("/mockai/rules/explanations", get(list_rule_explanations))
2507 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2508 .route("/chaos/config", get(get_chaos_config))
2509 .route("/chaos/config", post(update_chaos_config))
2510 .route("/network/profiles", get(list_network_profiles))
2511 .route("/network/profile/apply", post(apply_network_profile));
2512
2513 let router =
2515 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2516
2517 router.with_state(state)
2518}
2519
2520#[cfg(feature = "kafka")]
2521#[derive(Debug, Clone, Serialize, Deserialize)]
2523pub struct KafkaBrokerStats {
2524 pub topics: usize,
2526 pub partitions: usize,
2528 pub consumer_groups: usize,
2530 pub messages_produced: u64,
2532 pub messages_consumed: u64,
2534}
2535
2536#[cfg(feature = "kafka")]
2537#[allow(missing_docs)]
2538#[derive(Debug, Clone, Serialize, Deserialize)]
2539pub struct KafkaTopicInfo {
2540 pub name: String,
2541 pub partitions: usize,
2542 pub replication_factor: i32,
2543}
2544
2545#[cfg(feature = "kafka")]
2546#[allow(missing_docs)]
2547#[derive(Debug, Clone, Serialize, Deserialize)]
2548pub struct KafkaConsumerGroupInfo {
2549 pub group_id: String,
2550 pub members: usize,
2551 pub state: String,
2552}
2553
2554#[cfg(feature = "kafka")]
2555async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2557 if let Some(broker) = &state.kafka_broker {
2558 let topics = broker.topics.read().await;
2559 let consumer_groups = broker.consumer_groups.read().await;
2560
2561 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2562
2563 let metrics_snapshot = broker.metrics().snapshot();
2565
2566 let stats = KafkaBrokerStats {
2567 topics: topics.len(),
2568 partitions: total_partitions,
2569 consumer_groups: consumer_groups.groups().len(),
2570 messages_produced: metrics_snapshot.messages_produced_total,
2571 messages_consumed: metrics_snapshot.messages_consumed_total,
2572 };
2573
2574 Json(stats).into_response()
2575 } else {
2576 (
2577 StatusCode::SERVICE_UNAVAILABLE,
2578 Json(serde_json::json!({
2579 "error": "Kafka broker not available",
2580 "message": "Kafka broker is not enabled or not available."
2581 })),
2582 )
2583 .into_response()
2584 }
2585}
2586
2587#[cfg(feature = "kafka")]
2588async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2590 if let Some(broker) = &state.kafka_broker {
2591 let topics = broker.topics.read().await;
2592 let topic_list: Vec<KafkaTopicInfo> = topics
2593 .iter()
2594 .map(|(name, topic)| KafkaTopicInfo {
2595 name: name.clone(),
2596 partitions: topic.partitions.len(),
2597 replication_factor: topic.config.replication_factor as i32,
2598 })
2599 .collect();
2600
2601 Json(serde_json::json!({
2602 "topics": topic_list
2603 }))
2604 .into_response()
2605 } else {
2606 (
2607 StatusCode::SERVICE_UNAVAILABLE,
2608 Json(serde_json::json!({
2609 "error": "Kafka broker not available",
2610 "message": "Kafka broker is not enabled or not available."
2611 })),
2612 )
2613 .into_response()
2614 }
2615}
2616
2617#[cfg(feature = "kafka")]
2618async fn get_kafka_topic(
2620 State(state): State<ManagementState>,
2621 Path(topic_name): Path<String>,
2622) -> impl IntoResponse {
2623 if let Some(broker) = &state.kafka_broker {
2624 let topics = broker.topics.read().await;
2625 if let Some(topic) = topics.get(&topic_name) {
2626 Json(serde_json::json!({
2627 "name": topic_name,
2628 "partitions": topic.partitions.len(),
2629 "replication_factor": topic.config.replication_factor,
2630 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2631 "id": idx as i32,
2632 "leader": 0,
2633 "replicas": vec![0],
2634 "message_count": partition.messages.len()
2635 })).collect::<Vec<_>>()
2636 })).into_response()
2637 } else {
2638 (
2639 StatusCode::NOT_FOUND,
2640 Json(serde_json::json!({
2641 "error": "Topic not found",
2642 "topic": topic_name
2643 })),
2644 )
2645 .into_response()
2646 }
2647 } else {
2648 (
2649 StatusCode::SERVICE_UNAVAILABLE,
2650 Json(serde_json::json!({
2651 "error": "Kafka broker not available",
2652 "message": "Kafka broker is not enabled or not available."
2653 })),
2654 )
2655 .into_response()
2656 }
2657}
2658
2659#[cfg(feature = "kafka")]
2660async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2662 if let Some(broker) = &state.kafka_broker {
2663 let consumer_groups = broker.consumer_groups.read().await;
2664 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2665 .groups()
2666 .iter()
2667 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2668 group_id: group_id.clone(),
2669 members: group.members.len(),
2670 state: "Stable".to_string(), })
2672 .collect();
2673
2674 Json(serde_json::json!({
2675 "groups": groups
2676 }))
2677 .into_response()
2678 } else {
2679 (
2680 StatusCode::SERVICE_UNAVAILABLE,
2681 Json(serde_json::json!({
2682 "error": "Kafka broker not available",
2683 "message": "Kafka broker is not enabled or not available."
2684 })),
2685 )
2686 .into_response()
2687 }
2688}
2689
2690#[cfg(feature = "kafka")]
2691async fn get_kafka_group(
2693 State(state): State<ManagementState>,
2694 Path(group_id): Path<String>,
2695) -> impl IntoResponse {
2696 if let Some(broker) = &state.kafka_broker {
2697 let consumer_groups = broker.consumer_groups.read().await;
2698 if let Some(group) = consumer_groups.groups().get(&group_id) {
2699 Json(serde_json::json!({
2700 "group_id": group_id,
2701 "members": group.members.len(),
2702 "state": "Stable",
2703 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2704 "member_id": member_id,
2705 "client_id": member.client_id,
2706 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2707 "topic": a.topic,
2708 "partitions": a.partitions
2709 })).collect::<Vec<_>>()
2710 })).collect::<Vec<_>>(),
2711 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2712 "topic": topic,
2713 "partition": partition,
2714 "offset": offset
2715 })).collect::<Vec<_>>()
2716 })).into_response()
2717 } else {
2718 (
2719 StatusCode::NOT_FOUND,
2720 Json(serde_json::json!({
2721 "error": "Consumer group not found",
2722 "group_id": group_id
2723 })),
2724 )
2725 .into_response()
2726 }
2727 } else {
2728 (
2729 StatusCode::SERVICE_UNAVAILABLE,
2730 Json(serde_json::json!({
2731 "error": "Kafka broker not available",
2732 "message": "Kafka broker is not enabled or not available."
2733 })),
2734 )
2735 .into_response()
2736 }
2737}
2738
2739#[cfg(feature = "kafka")]
2742#[derive(Debug, Deserialize)]
2744pub struct KafkaProduceRequest {
2745 pub topic: String,
2747 #[serde(default)]
2749 pub key: Option<String>,
2750 pub value: String,
2752 #[serde(default)]
2754 pub partition: Option<i32>,
2755 #[serde(default)]
2757 pub headers: Option<std::collections::HashMap<String, String>>,
2758}
2759
2760#[cfg(feature = "kafka")]
2761async fn produce_kafka_message(
2763 State(state): State<ManagementState>,
2764 Json(request): Json<KafkaProduceRequest>,
2765) -> impl IntoResponse {
2766 if let Some(broker) = &state.kafka_broker {
2767 let mut topics = broker.topics.write().await;
2768
2769 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2771 mockforge_kafka::topics::Topic::new(
2772 request.topic.clone(),
2773 mockforge_kafka::topics::TopicConfig::default(),
2774 )
2775 });
2776
2777 let partition_id = if let Some(partition) = request.partition {
2779 partition
2780 } else {
2781 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2782 };
2783
2784 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2786 return (
2787 StatusCode::BAD_REQUEST,
2788 Json(serde_json::json!({
2789 "error": "Invalid partition",
2790 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2791 })),
2792 )
2793 .into_response();
2794 }
2795
2796 let key_clone = request.key.clone();
2798 let headers_clone = request.headers.clone();
2799 let message = mockforge_kafka::partitions::KafkaMessage {
2800 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2802 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2803 value: request.value.as_bytes().to_vec(),
2804 headers: headers_clone
2805 .clone()
2806 .unwrap_or_default()
2807 .into_iter()
2808 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2809 .collect(),
2810 };
2811
2812 match topic_entry.produce(partition_id, message).await {
2814 Ok(offset) => {
2815 if let Some(broker) = &state.kafka_broker {
2817 broker.metrics().record_messages_produced(1);
2818 }
2819
2820 #[cfg(feature = "kafka")]
2822 {
2823 let event = MessageEvent::Kafka(KafkaMessageEvent {
2824 topic: request.topic.clone(),
2825 key: key_clone,
2826 value: request.value.clone(),
2827 partition: partition_id,
2828 offset,
2829 headers: headers_clone,
2830 timestamp: chrono::Utc::now().to_rfc3339(),
2831 });
2832 let _ = state.message_events.send(event);
2833 }
2834
2835 Json(serde_json::json!({
2836 "success": true,
2837 "message": format!("Message produced to topic '{}'", request.topic),
2838 "topic": request.topic,
2839 "partition": partition_id,
2840 "offset": offset
2841 }))
2842 .into_response()
2843 }
2844 Err(e) => (
2845 StatusCode::INTERNAL_SERVER_ERROR,
2846 Json(serde_json::json!({
2847 "error": "Failed to produce message",
2848 "message": e.to_string()
2849 })),
2850 )
2851 .into_response(),
2852 }
2853 } else {
2854 (
2855 StatusCode::SERVICE_UNAVAILABLE,
2856 Json(serde_json::json!({
2857 "error": "Kafka broker not available",
2858 "message": "Kafka broker is not enabled or not available."
2859 })),
2860 )
2861 .into_response()
2862 }
2863}
2864
2865#[cfg(feature = "kafka")]
2866#[derive(Debug, Deserialize)]
2868pub struct KafkaBatchProduceRequest {
2869 pub messages: Vec<KafkaProduceRequest>,
2871 #[serde(default = "default_delay")]
2873 pub delay_ms: u64,
2874}
2875
2876#[cfg(feature = "kafka")]
2877async fn produce_kafka_batch(
2879 State(state): State<ManagementState>,
2880 Json(request): Json<KafkaBatchProduceRequest>,
2881) -> impl IntoResponse {
2882 if let Some(broker) = &state.kafka_broker {
2883 if request.messages.is_empty() {
2884 return (
2885 StatusCode::BAD_REQUEST,
2886 Json(serde_json::json!({
2887 "error": "Empty batch",
2888 "message": "At least one message is required"
2889 })),
2890 )
2891 .into_response();
2892 }
2893
2894 let mut results = Vec::new();
2895
2896 for (index, msg_request) in request.messages.iter().enumerate() {
2897 let mut topics = broker.topics.write().await;
2898
2899 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2901 mockforge_kafka::topics::Topic::new(
2902 msg_request.topic.clone(),
2903 mockforge_kafka::topics::TopicConfig::default(),
2904 )
2905 });
2906
2907 let partition_id = if let Some(partition) = msg_request.partition {
2909 partition
2910 } else {
2911 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2912 };
2913
2914 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2916 results.push(serde_json::json!({
2917 "index": index,
2918 "success": false,
2919 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2920 }));
2921 continue;
2922 }
2923
2924 let message = mockforge_kafka::partitions::KafkaMessage {
2926 offset: 0,
2927 timestamp: chrono::Utc::now().timestamp_millis(),
2928 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2929 value: msg_request.value.as_bytes().to_vec(),
2930 headers: msg_request
2931 .headers
2932 .clone()
2933 .unwrap_or_default()
2934 .into_iter()
2935 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2936 .collect(),
2937 };
2938
2939 match topic_entry.produce(partition_id, message).await {
2941 Ok(offset) => {
2942 if let Some(broker) = &state.kafka_broker {
2944 broker.metrics().record_messages_produced(1);
2945 }
2946
2947 let event = MessageEvent::Kafka(KafkaMessageEvent {
2949 topic: msg_request.topic.clone(),
2950 key: msg_request.key.clone(),
2951 value: msg_request.value.clone(),
2952 partition: partition_id,
2953 offset,
2954 headers: msg_request.headers.clone(),
2955 timestamp: chrono::Utc::now().to_rfc3339(),
2956 });
2957 let _ = state.message_events.send(event);
2958
2959 results.push(serde_json::json!({
2960 "index": index,
2961 "success": true,
2962 "topic": msg_request.topic,
2963 "partition": partition_id,
2964 "offset": offset
2965 }));
2966 }
2967 Err(e) => {
2968 results.push(serde_json::json!({
2969 "index": index,
2970 "success": false,
2971 "error": e.to_string()
2972 }));
2973 }
2974 }
2975
2976 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2978 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2979 }
2980 }
2981
2982 let success_count =
2983 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2984
2985 Json(serde_json::json!({
2986 "success": true,
2987 "total": request.messages.len(),
2988 "succeeded": success_count,
2989 "failed": request.messages.len() - success_count,
2990 "results": results
2991 }))
2992 .into_response()
2993 } else {
2994 (
2995 StatusCode::SERVICE_UNAVAILABLE,
2996 Json(serde_json::json!({
2997 "error": "Kafka broker not available",
2998 "message": "Kafka broker is not enabled or not available."
2999 })),
3000 )
3001 .into_response()
3002 }
3003}
3004
3005#[cfg(feature = "mqtt")]
3008async fn mqtt_messages_stream(
3010 State(state): State<ManagementState>,
3011 Query(params): Query<std::collections::HashMap<String, String>>,
3012) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3013 let rx = state.message_events.subscribe();
3014 let topic_filter = params.get("topic").cloned();
3015
3016 let stream = stream::unfold(rx, move |mut rx| {
3017 let topic_filter = topic_filter.clone();
3018
3019 async move {
3020 loop {
3021 match rx.recv().await {
3022 Ok(MessageEvent::Mqtt(event)) => {
3023 if let Some(filter) = &topic_filter {
3025 if !event.topic.contains(filter) {
3026 continue;
3027 }
3028 }
3029
3030 let event_json = serde_json::json!({
3031 "protocol": "mqtt",
3032 "topic": event.topic,
3033 "payload": event.payload,
3034 "qos": event.qos,
3035 "retain": event.retain,
3036 "timestamp": event.timestamp,
3037 });
3038
3039 if let Ok(event_data) = serde_json::to_string(&event_json) {
3040 let sse_event = Event::default().event("mqtt_message").data(event_data);
3041 return Some((Ok(sse_event), rx));
3042 }
3043 }
3044 #[cfg(feature = "kafka")]
3045 Ok(MessageEvent::Kafka(_)) => {
3046 continue;
3048 }
3049 Err(broadcast::error::RecvError::Closed) => {
3050 return None;
3051 }
3052 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3053 warn!("MQTT message stream lagged, skipped {} messages", skipped);
3054 continue;
3055 }
3056 }
3057 }
3058 }
3059 });
3060
3061 Sse::new(stream).keep_alive(
3062 axum::response::sse::KeepAlive::new()
3063 .interval(std::time::Duration::from_secs(15))
3064 .text("keep-alive-text"),
3065 )
3066}
3067
3068#[cfg(feature = "kafka")]
3069async fn kafka_messages_stream(
3071 State(state): State<ManagementState>,
3072 Query(params): Query<std::collections::HashMap<String, String>>,
3073) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3074 let rx = state.message_events.subscribe();
3075 let topic_filter = params.get("topic").cloned();
3076
3077 let stream = stream::unfold(rx, move |mut rx| {
3078 let topic_filter = topic_filter.clone();
3079
3080 async move {
3081 loop {
3082 match rx.recv().await {
3083 #[cfg(feature = "mqtt")]
3084 Ok(MessageEvent::Mqtt(_)) => {
3085 continue;
3087 }
3088 Ok(MessageEvent::Kafka(event)) => {
3089 if let Some(filter) = &topic_filter {
3091 if !event.topic.contains(filter) {
3092 continue;
3093 }
3094 }
3095
3096 let event_json = serde_json::json!({
3097 "protocol": "kafka",
3098 "topic": event.topic,
3099 "key": event.key,
3100 "value": event.value,
3101 "partition": event.partition,
3102 "offset": event.offset,
3103 "headers": event.headers,
3104 "timestamp": event.timestamp,
3105 });
3106
3107 if let Ok(event_data) = serde_json::to_string(&event_json) {
3108 let sse_event =
3109 Event::default().event("kafka_message").data(event_data);
3110 return Some((Ok(sse_event), rx));
3111 }
3112 }
3113 Err(broadcast::error::RecvError::Closed) => {
3114 return None;
3115 }
3116 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3117 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3118 continue;
3119 }
3120 }
3121 }
3122 }
3123 });
3124
3125 Sse::new(stream).keep_alive(
3126 axum::response::sse::KeepAlive::new()
3127 .interval(std::time::Duration::from_secs(15))
3128 .text("keep-alive-text"),
3129 )
3130}
3131
3132#[derive(Debug, Deserialize)]
3136pub struct GenerateSpecRequest {
3137 pub query: String,
3139 pub spec_type: String,
3141 pub api_version: Option<String>,
3143}
3144
3145#[derive(Debug, Deserialize)]
3147pub struct GenerateOpenApiFromTrafficRequest {
3148 #[serde(default)]
3150 pub database_path: Option<String>,
3151 #[serde(default)]
3153 pub since: Option<String>,
3154 #[serde(default)]
3156 pub until: Option<String>,
3157 #[serde(default)]
3159 pub path_pattern: Option<String>,
3160 #[serde(default = "default_min_confidence")]
3162 pub min_confidence: f64,
3163}
3164
3165fn default_min_confidence() -> f64 {
3166 0.7
3167}
3168
3169#[cfg(feature = "data-faker")]
3171async fn generate_ai_spec(
3172 State(_state): State<ManagementState>,
3173 Json(request): Json<GenerateSpecRequest>,
3174) -> impl IntoResponse {
3175 use mockforge_data::rag::{
3176 config::{LlmProvider, RagConfig},
3177 engine::RagEngine,
3178 storage::DocumentStorage,
3179 };
3180 use std::sync::Arc;
3181
3182 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3184 .ok()
3185 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3186
3187 if api_key.is_none() {
3189 return (
3190 StatusCode::SERVICE_UNAVAILABLE,
3191 Json(serde_json::json!({
3192 "error": "AI service not configured",
3193 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3194 })),
3195 )
3196 .into_response();
3197 }
3198
3199 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3201 .unwrap_or_else(|_| "openai".to_string())
3202 .to_lowercase();
3203
3204 let provider = match provider_str.as_str() {
3205 "openai" => LlmProvider::OpenAI,
3206 "anthropic" => LlmProvider::Anthropic,
3207 "ollama" => LlmProvider::Ollama,
3208 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3209 _ => LlmProvider::OpenAI,
3210 };
3211
3212 let api_endpoint =
3213 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3214 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3215 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3216 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3217 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3218 });
3219
3220 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3221 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3222 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3223 LlmProvider::Ollama => "llama2".to_string(),
3224 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3225 });
3226
3227 let rag_config = RagConfig {
3229 provider,
3230 api_endpoint,
3231 api_key,
3232 model,
3233 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3234 .unwrap_or_else(|_| "4096".to_string())
3235 .parse()
3236 .unwrap_or(4096),
3237 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3238 .unwrap_or_else(|_| "0.3".to_string())
3239 .parse()
3240 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3242 .unwrap_or_else(|_| "60".to_string())
3243 .parse()
3244 .unwrap_or(60),
3245 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3246 .unwrap_or_else(|_| "4000".to_string())
3247 .parse()
3248 .unwrap_or(4000),
3249 ..Default::default()
3250 };
3251
3252 let spec_type_label = match request.spec_type.as_str() {
3254 "openapi" => "OpenAPI 3.0",
3255 "graphql" => "GraphQL",
3256 "asyncapi" => "AsyncAPI",
3257 _ => "OpenAPI 3.0",
3258 };
3259
3260 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3261
3262 let prompt = format!(
3263 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3264
3265User Requirements:
3266{}
3267
3268Instructions:
32691. Generate a complete, valid {} specification
32702. Include all paths, operations, request/response schemas, and components
32713. Use realistic field names and data types
32724. Include proper descriptions and examples
32735. Follow {} best practices
32746. Return ONLY the specification, no additional explanation
32757. For OpenAPI, use version {}
3276
3277Return the specification in {} format."#,
3278 spec_type_label,
3279 request.query,
3280 spec_type_label,
3281 spec_type_label,
3282 api_version,
3283 if request.spec_type == "graphql" {
3284 "GraphQL SDL"
3285 } else {
3286 "YAML"
3287 }
3288 );
3289
3290 use mockforge_data::rag::storage::InMemoryStorage;
3295 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3296
3297 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3299 Ok(engine) => engine,
3300 Err(e) => {
3301 return (
3302 StatusCode::INTERNAL_SERVER_ERROR,
3303 Json(serde_json::json!({
3304 "error": "Failed to initialize RAG engine",
3305 "message": e.to_string()
3306 })),
3307 )
3308 .into_response();
3309 }
3310 };
3311
3312 match rag_engine.generate(&prompt, None).await {
3314 Ok(generated_text) => {
3315 let spec = if request.spec_type == "graphql" {
3317 extract_graphql_schema(&generated_text)
3319 } else {
3320 extract_yaml_spec(&generated_text)
3322 };
3323
3324 Json(serde_json::json!({
3325 "success": true,
3326 "spec": spec,
3327 "spec_type": request.spec_type,
3328 }))
3329 .into_response()
3330 }
3331 Err(e) => (
3332 StatusCode::INTERNAL_SERVER_ERROR,
3333 Json(serde_json::json!({
3334 "error": "AI generation failed",
3335 "message": e.to_string()
3336 })),
3337 )
3338 .into_response(),
3339 }
3340}
3341
3342#[cfg(not(feature = "data-faker"))]
3343async fn generate_ai_spec(
3344 State(_state): State<ManagementState>,
3345 Json(_request): Json<GenerateSpecRequest>,
3346) -> impl IntoResponse {
3347 (
3348 StatusCode::NOT_IMPLEMENTED,
3349 Json(serde_json::json!({
3350 "error": "AI features not enabled",
3351 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3352 })),
3353 )
3354 .into_response()
3355}
3356
3357#[cfg(feature = "behavioral-cloning")]
3359async fn generate_openapi_from_traffic(
3360 State(_state): State<ManagementState>,
3361 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3362) -> impl IntoResponse {
3363 use chrono::{DateTime, Utc};
3364 use mockforge_core::intelligent_behavior::{
3365 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3366 IntelligentBehaviorConfig,
3367 };
3368 use mockforge_recorder::{
3369 database::RecorderDatabase,
3370 openapi_export::{QueryFilters, RecordingsToOpenApi},
3371 };
3372 use std::path::PathBuf;
3373
3374 let db_path = if let Some(ref path) = request.database_path {
3376 PathBuf::from(path)
3377 } else {
3378 std::env::current_dir()
3379 .unwrap_or_else(|_| PathBuf::from("."))
3380 .join("recordings.db")
3381 };
3382
3383 let db = match RecorderDatabase::new(&db_path).await {
3385 Ok(db) => db,
3386 Err(e) => {
3387 return (
3388 StatusCode::BAD_REQUEST,
3389 Json(serde_json::json!({
3390 "error": "Database error",
3391 "message": format!("Failed to open recorder database: {}", e)
3392 })),
3393 )
3394 .into_response();
3395 }
3396 };
3397
3398 let since_dt = if let Some(ref since_str) = request.since {
3400 match DateTime::parse_from_rfc3339(since_str) {
3401 Ok(dt) => Some(dt.with_timezone(&Utc)),
3402 Err(e) => {
3403 return (
3404 StatusCode::BAD_REQUEST,
3405 Json(serde_json::json!({
3406 "error": "Invalid date format",
3407 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3408 })),
3409 )
3410 .into_response();
3411 }
3412 }
3413 } else {
3414 None
3415 };
3416
3417 let until_dt = if let Some(ref until_str) = request.until {
3418 match DateTime::parse_from_rfc3339(until_str) {
3419 Ok(dt) => Some(dt.with_timezone(&Utc)),
3420 Err(e) => {
3421 return (
3422 StatusCode::BAD_REQUEST,
3423 Json(serde_json::json!({
3424 "error": "Invalid date format",
3425 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3426 })),
3427 )
3428 .into_response();
3429 }
3430 }
3431 } else {
3432 None
3433 };
3434
3435 let query_filters = QueryFilters {
3437 since: since_dt,
3438 until: until_dt,
3439 path_pattern: request.path_pattern.clone(),
3440 min_status_code: None,
3441 max_requests: Some(1000),
3442 };
3443
3444 let exchanges_from_recorder =
3449 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3450 Ok(exchanges) => exchanges,
3451 Err(e) => {
3452 return (
3453 StatusCode::INTERNAL_SERVER_ERROR,
3454 Json(serde_json::json!({
3455 "error": "Query error",
3456 "message": format!("Failed to query HTTP exchanges: {}", e)
3457 })),
3458 )
3459 .into_response();
3460 }
3461 };
3462
3463 if exchanges_from_recorder.is_empty() {
3464 return (
3465 StatusCode::NOT_FOUND,
3466 Json(serde_json::json!({
3467 "error": "No exchanges found",
3468 "message": "No HTTP exchanges found matching the specified filters"
3469 })),
3470 )
3471 .into_response();
3472 }
3473
3474 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3476 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3477 .into_iter()
3478 .map(|e| LocalHttpExchange {
3479 method: e.method,
3480 path: e.path,
3481 query_params: e.query_params,
3482 headers: e.headers,
3483 body: e.body,
3484 body_encoding: e.body_encoding,
3485 status_code: e.status_code,
3486 response_headers: e.response_headers,
3487 response_body: e.response_body,
3488 response_body_encoding: e.response_body_encoding,
3489 timestamp: e.timestamp,
3490 })
3491 .collect();
3492
3493 let behavior_config = IntelligentBehaviorConfig::default();
3495 let gen_config = OpenApiGenerationConfig {
3496 min_confidence: request.min_confidence,
3497 behavior_model: Some(behavior_config.behavior_model),
3498 };
3499
3500 let generator = OpenApiSpecGenerator::new(gen_config);
3502 let result = match generator.generate_from_exchanges(exchanges).await {
3503 Ok(result) => result,
3504 Err(e) => {
3505 return (
3506 StatusCode::INTERNAL_SERVER_ERROR,
3507 Json(serde_json::json!({
3508 "error": "Generation error",
3509 "message": format!("Failed to generate OpenAPI spec: {}", e)
3510 })),
3511 )
3512 .into_response();
3513 }
3514 };
3515
3516 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3518 raw.clone()
3519 } else {
3520 match serde_json::to_value(&result.spec.spec) {
3521 Ok(json) => json,
3522 Err(e) => {
3523 return (
3524 StatusCode::INTERNAL_SERVER_ERROR,
3525 Json(serde_json::json!({
3526 "error": "Serialization error",
3527 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3528 })),
3529 )
3530 .into_response();
3531 }
3532 }
3533 };
3534
3535 let response = serde_json::json!({
3537 "spec": spec_json,
3538 "metadata": {
3539 "requests_analyzed": result.metadata.requests_analyzed,
3540 "paths_inferred": result.metadata.paths_inferred,
3541 "path_confidence": result.metadata.path_confidence,
3542 "generated_at": result.metadata.generated_at.to_rfc3339(),
3543 "duration_ms": result.metadata.duration_ms,
3544 }
3545 });
3546
3547 Json(response).into_response()
3548}
3549
3550async fn list_rule_explanations(
3552 State(state): State<ManagementState>,
3553 Query(params): Query<std::collections::HashMap<String, String>>,
3554) -> impl IntoResponse {
3555 use mockforge_core::intelligent_behavior::RuleType;
3556
3557 let explanations = state.rule_explanations.read().await;
3558 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3559
3560 if let Some(rule_type_str) = params.get("rule_type") {
3562 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3563 explanations_vec.retain(|e| e.rule_type == rule_type);
3564 }
3565 }
3566
3567 if let Some(min_confidence_str) = params.get("min_confidence") {
3569 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3570 explanations_vec.retain(|e| e.confidence >= min_confidence);
3571 }
3572 }
3573
3574 explanations_vec.sort_by(|a, b| {
3576 b.confidence
3577 .partial_cmp(&a.confidence)
3578 .unwrap_or(std::cmp::Ordering::Equal)
3579 .then_with(|| b.generated_at.cmp(&a.generated_at))
3580 });
3581
3582 Json(serde_json::json!({
3583 "explanations": explanations_vec,
3584 "total": explanations_vec.len(),
3585 }))
3586 .into_response()
3587}
3588
3589async fn get_rule_explanation(
3591 State(state): State<ManagementState>,
3592 Path(rule_id): Path<String>,
3593) -> impl IntoResponse {
3594 let explanations = state.rule_explanations.read().await;
3595
3596 match explanations.get(&rule_id) {
3597 Some(explanation) => Json(serde_json::json!({
3598 "explanation": explanation,
3599 }))
3600 .into_response(),
3601 None => (
3602 StatusCode::NOT_FOUND,
3603 Json(serde_json::json!({
3604 "error": "Rule explanation not found",
3605 "message": format!("No explanation found for rule ID: {}", rule_id)
3606 })),
3607 )
3608 .into_response(),
3609 }
3610}
3611
3612#[derive(Debug, Deserialize)]
3614pub struct LearnFromExamplesRequest {
3615 pub examples: Vec<ExamplePairRequest>,
3617 #[serde(default)]
3619 pub config: Option<serde_json::Value>,
3620}
3621
3622#[derive(Debug, Deserialize)]
3624pub struct ExamplePairRequest {
3625 pub request: serde_json::Value,
3627 pub response: serde_json::Value,
3629}
3630
3631async fn learn_from_examples(
3636 State(state): State<ManagementState>,
3637 Json(request): Json<LearnFromExamplesRequest>,
3638) -> impl IntoResponse {
3639 use mockforge_core::intelligent_behavior::{
3640 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3641 rule_generator::{ExamplePair, RuleGenerator},
3642 };
3643
3644 if request.examples.is_empty() {
3645 return (
3646 StatusCode::BAD_REQUEST,
3647 Json(serde_json::json!({
3648 "error": "No examples provided",
3649 "message": "At least one example pair is required"
3650 })),
3651 )
3652 .into_response();
3653 }
3654
3655 let example_pairs: Result<Vec<ExamplePair>, String> = request
3657 .examples
3658 .into_iter()
3659 .enumerate()
3660 .map(|(idx, ex)| {
3661 let method = ex
3663 .request
3664 .get("method")
3665 .and_then(|v| v.as_str())
3666 .map(|s| s.to_string())
3667 .unwrap_or_else(|| "GET".to_string());
3668 let path = ex
3669 .request
3670 .get("path")
3671 .and_then(|v| v.as_str())
3672 .map(|s| s.to_string())
3673 .unwrap_or_else(|| "/".to_string());
3674 let request_body = ex.request.get("body").cloned();
3675 let query_params = ex
3676 .request
3677 .get("query_params")
3678 .and_then(|v| v.as_object())
3679 .map(|obj| {
3680 obj.iter()
3681 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3682 .collect()
3683 })
3684 .unwrap_or_default();
3685 let headers = ex
3686 .request
3687 .get("headers")
3688 .and_then(|v| v.as_object())
3689 .map(|obj| {
3690 obj.iter()
3691 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3692 .collect()
3693 })
3694 .unwrap_or_default();
3695
3696 let status = ex
3698 .response
3699 .get("status_code")
3700 .or_else(|| ex.response.get("status"))
3701 .and_then(|v| v.as_u64())
3702 .map(|n| n as u16)
3703 .unwrap_or(200);
3704 let response_body = ex.response.get("body").cloned();
3705
3706 Ok(ExamplePair {
3707 method,
3708 path,
3709 request: request_body,
3710 status,
3711 response: response_body,
3712 query_params,
3713 headers,
3714 metadata: {
3715 let mut meta = std::collections::HashMap::new();
3716 meta.insert("source".to_string(), "api".to_string());
3717 meta.insert("example_index".to_string(), idx.to_string());
3718 meta
3719 },
3720 })
3721 })
3722 .collect();
3723
3724 let example_pairs = match example_pairs {
3725 Ok(pairs) => pairs,
3726 Err(e) => {
3727 return (
3728 StatusCode::BAD_REQUEST,
3729 Json(serde_json::json!({
3730 "error": "Invalid examples",
3731 "message": e
3732 })),
3733 )
3734 .into_response();
3735 }
3736 };
3737
3738 let behavior_config = if let Some(config_json) = request.config {
3740 serde_json::from_value(config_json)
3742 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3743 .behavior_model
3744 } else {
3745 BehaviorModelConfig::default()
3746 };
3747
3748 let generator = RuleGenerator::new(behavior_config);
3750
3751 let (rules, explanations) =
3753 match generator.generate_rules_with_explanations(example_pairs).await {
3754 Ok(result) => result,
3755 Err(e) => {
3756 return (
3757 StatusCode::INTERNAL_SERVER_ERROR,
3758 Json(serde_json::json!({
3759 "error": "Rule generation failed",
3760 "message": format!("Failed to generate rules: {}", e)
3761 })),
3762 )
3763 .into_response();
3764 }
3765 };
3766
3767 {
3769 let mut stored_explanations = state.rule_explanations.write().await;
3770 for explanation in &explanations {
3771 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3772 }
3773 }
3774
3775 let response = serde_json::json!({
3777 "success": true,
3778 "rules_generated": {
3779 "consistency_rules": rules.consistency_rules.len(),
3780 "schemas": rules.schemas.len(),
3781 "state_machines": rules.state_transitions.len(),
3782 "system_prompt": !rules.system_prompt.is_empty(),
3783 },
3784 "explanations": explanations.iter().map(|e| serde_json::json!({
3785 "rule_id": e.rule_id,
3786 "rule_type": e.rule_type,
3787 "confidence": e.confidence,
3788 "reasoning": e.reasoning,
3789 })).collect::<Vec<_>>(),
3790 "total_explanations": explanations.len(),
3791 });
3792
3793 Json(response).into_response()
3794}
3795
3796#[cfg(feature = "data-faker")]
3797fn extract_yaml_spec(text: &str) -> String {
3798 if let Some(start) = text.find("```yaml") {
3800 let yaml_start = text[start + 7..].trim_start();
3801 if let Some(end) = yaml_start.find("```") {
3802 return yaml_start[..end].trim().to_string();
3803 }
3804 }
3805 if let Some(start) = text.find("```") {
3806 let content_start = text[start + 3..].trim_start();
3807 if let Some(end) = content_start.find("```") {
3808 return content_start[..end].trim().to_string();
3809 }
3810 }
3811
3812 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3814 return text.trim().to_string();
3815 }
3816
3817 text.trim().to_string()
3819}
3820
3821#[cfg(feature = "data-faker")]
3823fn extract_graphql_schema(text: &str) -> String {
3824 if let Some(start) = text.find("```graphql") {
3826 let schema_start = text[start + 10..].trim_start();
3827 if let Some(end) = schema_start.find("```") {
3828 return schema_start[..end].trim().to_string();
3829 }
3830 }
3831 if let Some(start) = text.find("```") {
3832 let content_start = text[start + 3..].trim_start();
3833 if let Some(end) = content_start.find("```") {
3834 return content_start[..end].trim().to_string();
3835 }
3836 }
3837
3838 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3840 return text.trim().to_string();
3841 }
3842
3843 text.trim().to_string()
3844}
3845
3846async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3850 #[cfg(feature = "chaos")]
3851 {
3852 if let Some(chaos_state) = &_state.chaos_api_state {
3853 let config = chaos_state.config.read().await;
3854 Json(serde_json::json!({
3856 "enabled": config.enabled,
3857 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3858 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3859 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3860 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3861 }))
3862 .into_response()
3863 } else {
3864 Json(serde_json::json!({
3866 "enabled": false,
3867 "latency": null,
3868 "fault_injection": null,
3869 "rate_limit": null,
3870 "traffic_shaping": null,
3871 }))
3872 .into_response()
3873 }
3874 }
3875 #[cfg(not(feature = "chaos"))]
3876 {
3877 Json(serde_json::json!({
3879 "enabled": false,
3880 "latency": null,
3881 "fault_injection": null,
3882 "rate_limit": null,
3883 "traffic_shaping": null,
3884 }))
3885 .into_response()
3886 }
3887}
3888
3889#[derive(Debug, Deserialize)]
3891pub struct ChaosConfigUpdate {
3892 pub enabled: Option<bool>,
3894 pub latency: Option<serde_json::Value>,
3896 pub fault_injection: Option<serde_json::Value>,
3898 pub rate_limit: Option<serde_json::Value>,
3900 pub traffic_shaping: Option<serde_json::Value>,
3902}
3903
3904async fn update_chaos_config(
3906 State(_state): State<ManagementState>,
3907 Json(_config_update): Json<ChaosConfigUpdate>,
3908) -> impl IntoResponse {
3909 #[cfg(feature = "chaos")]
3910 {
3911 if let Some(chaos_state) = &_state.chaos_api_state {
3912 use mockforge_chaos::config::{
3913 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3914 };
3915
3916 let mut config = chaos_state.config.write().await;
3917
3918 if let Some(enabled) = _config_update.enabled {
3920 config.enabled = enabled;
3921 }
3922
3923 if let Some(latency_json) = _config_update.latency {
3925 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3926 config.latency = Some(latency);
3927 }
3928 }
3929
3930 if let Some(fault_json) = _config_update.fault_injection {
3932 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3933 config.fault_injection = Some(fault);
3934 }
3935 }
3936
3937 if let Some(rate_json) = _config_update.rate_limit {
3939 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3940 config.rate_limit = Some(rate);
3941 }
3942 }
3943
3944 if let Some(traffic_json) = _config_update.traffic_shaping {
3946 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3947 config.traffic_shaping = Some(traffic);
3948 }
3949 }
3950
3951 drop(config);
3954
3955 info!("Chaos configuration updated successfully");
3956 Json(serde_json::json!({
3957 "success": true,
3958 "message": "Chaos configuration updated and applied"
3959 }))
3960 .into_response()
3961 } else {
3962 (
3963 StatusCode::SERVICE_UNAVAILABLE,
3964 Json(serde_json::json!({
3965 "success": false,
3966 "error": "Chaos API not available",
3967 "message": "Chaos engineering is not enabled or configured"
3968 })),
3969 )
3970 .into_response()
3971 }
3972 }
3973 #[cfg(not(feature = "chaos"))]
3974 {
3975 (
3976 StatusCode::NOT_IMPLEMENTED,
3977 Json(serde_json::json!({
3978 "success": false,
3979 "error": "Chaos feature not enabled",
3980 "message": "Chaos engineering feature is not compiled into this build"
3981 })),
3982 )
3983 .into_response()
3984 }
3985}
3986
3987async fn list_network_profiles() -> impl IntoResponse {
3991 use mockforge_core::network_profiles::NetworkProfileCatalog;
3992
3993 let catalog = NetworkProfileCatalog::default();
3994 let profiles: Vec<serde_json::Value> = catalog
3995 .list_profiles_with_description()
3996 .iter()
3997 .map(|(name, description)| {
3998 serde_json::json!({
3999 "name": name,
4000 "description": description,
4001 })
4002 })
4003 .collect();
4004
4005 Json(serde_json::json!({
4006 "profiles": profiles
4007 }))
4008 .into_response()
4009}
4010
4011#[derive(Debug, Deserialize)]
4012pub struct ApplyNetworkProfileRequest {
4014 pub profile_name: String,
4016}
4017
4018async fn apply_network_profile(
4020 State(state): State<ManagementState>,
4021 Json(request): Json<ApplyNetworkProfileRequest>,
4022) -> impl IntoResponse {
4023 use mockforge_core::network_profiles::NetworkProfileCatalog;
4024
4025 let catalog = NetworkProfileCatalog::default();
4026 if let Some(profile) = catalog.get(&request.profile_name) {
4027 if let Some(server_config) = &state.server_config {
4030 let mut config = server_config.write().await;
4031
4032 use mockforge_core::config::NetworkShapingConfig;
4034
4035 let network_shaping = NetworkShapingConfig {
4039 enabled: profile.traffic_shaping.bandwidth.enabled
4040 || profile.traffic_shaping.burst_loss.enabled,
4041 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4043 max_connections: 1000, };
4045
4046 if let Some(ref mut chaos) = config.observability.chaos {
4049 chaos.traffic_shaping = Some(network_shaping);
4050 } else {
4051 use mockforge_core::config::ChaosEngConfig;
4053 config.observability.chaos = Some(ChaosEngConfig {
4054 enabled: true,
4055 latency: None,
4056 fault_injection: None,
4057 rate_limit: None,
4058 traffic_shaping: Some(network_shaping),
4059 scenario: None,
4060 });
4061 }
4062
4063 info!("Network profile '{}' applied to server configuration", request.profile_name);
4064 } else {
4065 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4066 }
4067
4068 #[cfg(feature = "chaos")]
4070 {
4071 if let Some(chaos_state) = &state.chaos_api_state {
4072 use mockforge_chaos::config::TrafficShapingConfig;
4073
4074 let mut chaos_config = chaos_state.config.write().await;
4075 let chaos_traffic_shaping = TrafficShapingConfig {
4077 enabled: profile.traffic_shaping.bandwidth.enabled
4078 || profile.traffic_shaping.burst_loss.enabled,
4079 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4081 max_connections: 0,
4082 connection_timeout_ms: 30000,
4083 };
4084 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4085 chaos_config.enabled = true; drop(chaos_config);
4087 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4088 }
4089 }
4090
4091 Json(serde_json::json!({
4092 "success": true,
4093 "message": format!("Network profile '{}' applied", request.profile_name),
4094 "profile": {
4095 "name": profile.name,
4096 "description": profile.description,
4097 }
4098 }))
4099 .into_response()
4100 } else {
4101 (
4102 StatusCode::NOT_FOUND,
4103 Json(serde_json::json!({
4104 "error": "Profile not found",
4105 "message": format!("Network profile '{}' not found", request.profile_name)
4106 })),
4107 )
4108 .into_response()
4109 }
4110}
4111
4112pub fn management_router_with_ui_builder(
4114 state: ManagementState,
4115 server_config: mockforge_core::config::ServerConfig,
4116) -> Router {
4117 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4118
4119 let management = management_router(state);
4121
4122 let ui_builder_state = UIBuilderState::new(server_config);
4124 let ui_builder = create_ui_builder_router(ui_builder_state);
4125
4126 management.nest("/ui-builder", ui_builder)
4128}
4129
4130pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4132 use crate::spec_import::{spec_import_router, SpecImportState};
4133
4134 let management = management_router(state);
4136
4137 Router::new()
4139 .merge(management)
4140 .merge(spec_import_router(SpecImportState::new()))
4141}
4142
4143#[cfg(test)]
4144mod tests {
4145 use super::*;
4146
4147 #[tokio::test]
4148 async fn test_create_and_get_mock() {
4149 let state = ManagementState::new(None, None, 3000);
4150
4151 let mock = MockConfig {
4152 id: "test-1".to_string(),
4153 name: "Test Mock".to_string(),
4154 method: "GET".to_string(),
4155 path: "/test".to_string(),
4156 response: MockResponse {
4157 body: serde_json::json!({"message": "test"}),
4158 headers: None,
4159 },
4160 enabled: true,
4161 latency_ms: None,
4162 status_code: Some(200),
4163 request_match: None,
4164 priority: None,
4165 scenario: None,
4166 required_scenario_state: None,
4167 new_scenario_state: None,
4168 };
4169
4170 {
4172 let mut mocks = state.mocks.write().await;
4173 mocks.push(mock.clone());
4174 }
4175
4176 let mocks = state.mocks.read().await;
4178 let found = mocks.iter().find(|m| m.id == "test-1");
4179 assert!(found.is_some());
4180 assert_eq!(found.unwrap().name, "Test Mock");
4181 }
4182
4183 #[tokio::test]
4184 async fn test_server_stats() {
4185 let state = ManagementState::new(None, None, 3000);
4186
4187 {
4189 let mut mocks = state.mocks.write().await;
4190 mocks.push(MockConfig {
4191 id: "1".to_string(),
4192 name: "Mock 1".to_string(),
4193 method: "GET".to_string(),
4194 path: "/test1".to_string(),
4195 response: MockResponse {
4196 body: serde_json::json!({}),
4197 headers: None,
4198 },
4199 enabled: true,
4200 latency_ms: None,
4201 status_code: Some(200),
4202 request_match: None,
4203 priority: None,
4204 scenario: None,
4205 required_scenario_state: None,
4206 new_scenario_state: None,
4207 });
4208 mocks.push(MockConfig {
4209 id: "2".to_string(),
4210 name: "Mock 2".to_string(),
4211 method: "POST".to_string(),
4212 path: "/test2".to_string(),
4213 response: MockResponse {
4214 body: serde_json::json!({}),
4215 headers: None,
4216 },
4217 enabled: false,
4218 latency_ms: None,
4219 status_code: Some(201),
4220 request_match: None,
4221 priority: None,
4222 scenario: None,
4223 required_scenario_state: None,
4224 new_scenario_state: None,
4225 });
4226 }
4227
4228 let mocks = state.mocks.read().await;
4229 assert_eq!(mocks.len(), 2);
4230 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4231 }
4232
4233 #[test]
4234 fn test_mock_matches_request_with_xpath_absolute_path() {
4235 let mock = MockConfig {
4236 id: "xpath-1".to_string(),
4237 name: "XPath Match".to_string(),
4238 method: "POST".to_string(),
4239 path: "/xml".to_string(),
4240 response: MockResponse {
4241 body: serde_json::json!({"ok": true}),
4242 headers: None,
4243 },
4244 enabled: true,
4245 latency_ms: None,
4246 status_code: Some(200),
4247 request_match: Some(RequestMatchCriteria {
4248 xpath: Some("/root/order/id".to_string()),
4249 ..Default::default()
4250 }),
4251 priority: None,
4252 scenario: None,
4253 required_scenario_state: None,
4254 new_scenario_state: None,
4255 };
4256
4257 let body = br#"<root><order><id>123</id></order></root>"#;
4258 let headers = std::collections::HashMap::new();
4259 let query = std::collections::HashMap::new();
4260
4261 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4262 }
4263
4264 #[test]
4265 fn test_mock_matches_request_with_xpath_text_predicate() {
4266 let mock = MockConfig {
4267 id: "xpath-2".to_string(),
4268 name: "XPath Predicate Match".to_string(),
4269 method: "POST".to_string(),
4270 path: "/xml".to_string(),
4271 response: MockResponse {
4272 body: serde_json::json!({"ok": true}),
4273 headers: None,
4274 },
4275 enabled: true,
4276 latency_ms: None,
4277 status_code: Some(200),
4278 request_match: Some(RequestMatchCriteria {
4279 xpath: Some("//order/id[text()='123']".to_string()),
4280 ..Default::default()
4281 }),
4282 priority: None,
4283 scenario: None,
4284 required_scenario_state: None,
4285 new_scenario_state: None,
4286 };
4287
4288 let body = br#"<root><order><id>123</id></order></root>"#;
4289 let headers = std::collections::HashMap::new();
4290 let query = std::collections::HashMap::new();
4291
4292 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4293 }
4294
4295 #[test]
4296 fn test_mock_matches_request_with_xpath_no_match() {
4297 let mock = MockConfig {
4298 id: "xpath-3".to_string(),
4299 name: "XPath No Match".to_string(),
4300 method: "POST".to_string(),
4301 path: "/xml".to_string(),
4302 response: MockResponse {
4303 body: serde_json::json!({"ok": true}),
4304 headers: None,
4305 },
4306 enabled: true,
4307 latency_ms: None,
4308 status_code: Some(200),
4309 request_match: Some(RequestMatchCriteria {
4310 xpath: Some("//order/id[text()='456']".to_string()),
4311 ..Default::default()
4312 }),
4313 priority: None,
4314 scenario: None,
4315 required_scenario_state: None,
4316 new_scenario_state: None,
4317 };
4318
4319 let body = br#"<root><order><id>123</id></order></root>"#;
4320 let headers = std::collections::HashMap::new();
4321 let query = std::collections::HashMap::new();
4322
4323 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4324 }
4325}