1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131 pub body: serde_json::Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub headers: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146 pub query_params: std::collections::HashMap<String, String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub body_pattern: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub json_path: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub xpath: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub custom_matcher: Option<String>,
159}
160
161pub fn mock_matches_request(
170 mock: &MockConfig,
171 method: &str,
172 path: &str,
173 headers: &std::collections::HashMap<String, String>,
174 query_params: &std::collections::HashMap<String, String>,
175 body: Option<&[u8]>,
176) -> bool {
177 use regex::Regex;
178
179 if !mock.enabled {
181 return false;
182 }
183
184 if mock.method.to_uppercase() != method.to_uppercase() {
186 return false;
187 }
188
189 if !path_matches_pattern(&mock.path, path) {
191 return false;
192 }
193
194 if let Some(criteria) = &mock.request_match {
196 for (key, expected_value) in &criteria.headers {
198 let header_key_lower = key.to_lowercase();
199 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201 if let Some((_, actual_value)) = found {
202 if let Ok(re) = Regex::new(expected_value) {
204 if !re.is_match(actual_value) {
205 return false;
206 }
207 } else if actual_value != expected_value {
208 return false;
209 }
210 } else {
211 return false; }
213 }
214
215 for (key, expected_value) in &criteria.query_params {
217 if let Some(actual_value) = query_params.get(key) {
218 if actual_value != expected_value {
219 return false;
220 }
221 } else {
222 return false; }
224 }
225
226 if let Some(pattern) = &criteria.body_pattern {
228 if let Some(body_bytes) = body {
229 let body_str = String::from_utf8_lossy(body_bytes);
230 if let Ok(re) = Regex::new(pattern) {
232 if !re.is_match(&body_str) {
233 return false;
234 }
235 } else if body_str.as_ref() != pattern {
236 return false;
237 }
238 } else {
239 return false; }
241 }
242
243 if let Some(json_path) = &criteria.json_path {
245 if let Some(body_bytes) = body {
246 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248 if !json_path_exists(&json_value, json_path) {
250 return false;
251 }
252 }
253 }
254 }
255 }
256
257 if let Some(xpath) = &criteria.xpath {
259 if let Some(body_bytes) = body {
260 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261 if !xml_xpath_exists(body_str, xpath) {
262 return false;
263 }
264 } else {
265 return false;
266 }
267 } else {
268 return false; }
270 }
271
272 if let Some(custom) = &criteria.custom_matcher {
274 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275 return false;
276 }
277 }
278 }
279
280 true
281}
282
283fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285 if pattern == path {
287 return true;
288 }
289
290 if pattern == "*" {
292 return true;
293 }
294
295 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299 if pattern_parts.len() != path_parts.len() {
300 if pattern.contains('*') {
302 return matches_wildcard_pattern(pattern, path);
303 }
304 return false;
305 }
306
307 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310 continue; }
312
313 if pattern_part != path_part {
314 return false;
315 }
316 }
317
318 true
319}
320
321fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323 use regex::Regex;
324
325 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329 return re.is_match(path);
330 }
331
332 false
333}
334
335fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
337 if let Some(path) = json_path.strip_prefix("$.") {
340 let parts: Vec<&str> = path.split('.').collect();
341
342 let mut current = json;
343 for part in parts {
344 if let Some(obj) = current.as_object() {
345 if let Some(value) = obj.get(part) {
346 current = value;
347 } else {
348 return false;
349 }
350 } else {
351 return false;
352 }
353 }
354 true
355 } else {
356 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
358 false
359 }
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
363struct XPathSegment {
364 name: String,
365 text_equals: Option<String>,
366}
367
368fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
369 if segment.is_empty() {
370 return None;
371 }
372
373 let trimmed = segment.trim();
374 if let Some(bracket_start) = trimmed.find('[') {
375 if !trimmed.ends_with(']') {
376 return None;
377 }
378
379 let name = trimmed[..bracket_start].trim();
380 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
381 let predicate = predicate.trim();
382
383 if let Some(raw) = predicate.strip_prefix("text()=") {
385 let raw = raw.trim();
386 if raw.len() >= 2
387 && ((raw.starts_with('"') && raw.ends_with('"'))
388 || (raw.starts_with('\'') && raw.ends_with('\'')))
389 {
390 let text = raw[1..raw.len() - 1].to_string();
391 if !name.is_empty() {
392 return Some(XPathSegment {
393 name: name.to_string(),
394 text_equals: Some(text),
395 });
396 }
397 }
398 }
399
400 None
401 } else {
402 Some(XPathSegment {
403 name: trimmed.to_string(),
404 text_equals: None,
405 })
406 }
407}
408
409fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
410 if !node.is_element() {
411 return false;
412 }
413 if node.tag_name().name() != segment.name {
414 return false;
415 }
416 match &segment.text_equals {
417 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
418 None => true,
419 }
420}
421
422fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
429 let doc = match roxmltree::Document::parse(xml_body) {
430 Ok(doc) => doc,
431 Err(err) => {
432 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
433 return false;
434 }
435 };
436
437 let expr = xpath.trim();
438 if expr.is_empty() {
439 return false;
440 }
441
442 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
443 (true, rest)
444 } else if let Some(rest) = expr.strip_prefix('/') {
445 (false, rest)
446 } else {
447 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
448 return false;
449 };
450
451 let segments: Vec<XPathSegment> = path_str
452 .split('/')
453 .filter(|s| !s.trim().is_empty())
454 .filter_map(parse_xpath_segment)
455 .collect();
456
457 if segments.is_empty() {
458 return false;
459 }
460
461 if is_descendant {
462 let first = &segments[0];
463 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
464 let mut frontier = vec![node];
465 for segment in &segments[1..] {
466 let mut next_frontier = Vec::new();
467 for parent in &frontier {
468 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
469 next_frontier.push(child);
470 }
471 }
472 if next_frontier.is_empty() {
473 frontier.clear();
474 break;
475 }
476 frontier = next_frontier;
477 }
478 if !frontier.is_empty() {
479 return true;
480 }
481 }
482 false
483 } else {
484 let mut frontier = vec![doc.root_element()];
485 for (index, segment) in segments.iter().enumerate() {
486 let mut next_frontier = Vec::new();
487 for parent in &frontier {
488 if index == 0 {
489 if segment_matches(*parent, segment) {
490 next_frontier.push(*parent);
491 }
492 continue;
493 }
494 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
495 next_frontier.push(child);
496 }
497 }
498 if next_frontier.is_empty() {
499 return false;
500 }
501 frontier = next_frontier;
502 }
503 !frontier.is_empty()
504 }
505}
506
507fn evaluate_custom_matcher(
509 expression: &str,
510 method: &str,
511 path: &str,
512 headers: &std::collections::HashMap<String, String>,
513 query_params: &std::collections::HashMap<String, String>,
514 body: Option<&[u8]>,
515) -> bool {
516 use regex::Regex;
517
518 let expr = expression.trim();
519
520 if expr.contains("==") {
522 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
523 if parts.len() != 2 {
524 return false;
525 }
526
527 let field = parts[0];
528 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
529
530 match field {
531 "method" => method == expected_value,
532 "path" => path == expected_value,
533 _ if field.starts_with("headers.") => {
534 let header_name = &field[8..];
535 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
536 }
537 _ if field.starts_with("query.") => {
538 let param_name = &field[6..];
539 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
540 }
541 _ => false,
542 }
543 }
544 else if expr.contains("=~") {
546 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
547 if parts.len() != 2 {
548 return false;
549 }
550
551 let field = parts[0];
552 let pattern = parts[1].trim_matches('"').trim_matches('\'');
553
554 if let Ok(re) = Regex::new(pattern) {
555 match field {
556 "method" => re.is_match(method),
557 "path" => re.is_match(path),
558 _ if field.starts_with("headers.") => {
559 let header_name = &field[8..];
560 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
561 }
562 _ if field.starts_with("query.") => {
563 let param_name = &field[6..];
564 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
565 }
566 _ => false,
567 }
568 } else {
569 false
570 }
571 }
572 else if expr.contains("contains") {
574 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
575 if parts.len() != 2 {
576 return false;
577 }
578
579 let field = parts[0];
580 let search_value = parts[1].trim_matches('"').trim_matches('\'');
581
582 match field {
583 "path" => path.contains(search_value),
584 _ if field.starts_with("headers.") => {
585 let header_name = &field[8..];
586 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
587 }
588 _ if field.starts_with("body") => {
589 if let Some(body_bytes) = body {
590 let body_str = String::from_utf8_lossy(body_bytes);
591 body_str.contains(search_value)
592 } else {
593 false
594 }
595 }
596 _ => false,
597 }
598 } else {
599 tracing::warn!("Unknown custom matcher expression format: {}", expr);
601 false
602 }
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize)]
607pub struct ServerStats {
608 pub uptime_seconds: u64,
610 pub total_requests: u64,
612 pub active_mocks: usize,
614 pub enabled_mocks: usize,
616 pub registered_routes: usize,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
622pub struct ServerConfig {
623 pub version: String,
625 pub port: u16,
627 pub has_openapi_spec: bool,
629 #[serde(skip_serializing_if = "Option::is_none")]
631 pub spec_path: Option<String>,
632}
633
634#[derive(Clone)]
636pub struct ManagementState {
637 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
639 pub spec: Option<Arc<OpenApiSpec>>,
641 pub spec_path: Option<String>,
643 pub port: u16,
645 pub start_time: std::time::Instant,
647 pub request_counter: Arc<RwLock<u64>>,
649 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
651 #[cfg(feature = "smtp")]
653 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
654 #[cfg(feature = "mqtt")]
656 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
657 #[cfg(feature = "kafka")]
659 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
660 #[cfg(any(feature = "mqtt", feature = "kafka"))]
662 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
663 pub state_machine_manager:
665 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
666 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
668 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
670 pub rule_explanations: Arc<
672 RwLock<
673 std::collections::HashMap<
674 String,
675 mockforge_core::intelligent_behavior::RuleExplanation,
676 >,
677 >,
678 >,
679 #[cfg(feature = "chaos")]
681 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
682 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
684}
685
686impl ManagementState {
687 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
694 Self {
695 mocks: Arc::new(RwLock::new(Vec::new())),
696 spec,
697 spec_path,
698 port,
699 start_time: std::time::Instant::now(),
700 request_counter: Arc::new(RwLock::new(0)),
701 proxy_config: None,
702 #[cfg(feature = "smtp")]
703 smtp_registry: None,
704 #[cfg(feature = "mqtt")]
705 mqtt_broker: None,
706 #[cfg(feature = "kafka")]
707 kafka_broker: None,
708 #[cfg(any(feature = "mqtt", feature = "kafka"))]
709 message_events: {
710 let capacity = get_message_broadcast_capacity();
711 let (tx, _) = broadcast::channel(capacity);
712 Arc::new(tx)
713 },
714 state_machine_manager: Arc::new(RwLock::new(
715 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
716 )),
717 ws_broadcast: None,
718 lifecycle_hooks: None,
719 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
720 #[cfg(feature = "chaos")]
721 chaos_api_state: None,
722 server_config: None,
723 }
724 }
725
726 pub fn with_lifecycle_hooks(
728 mut self,
729 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
730 ) -> Self {
731 self.lifecycle_hooks = Some(hooks);
732 self
733 }
734
735 pub fn with_ws_broadcast(
737 mut self,
738 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
739 ) -> Self {
740 self.ws_broadcast = Some(ws_broadcast);
741 self
742 }
743
744 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
746 self.proxy_config = Some(proxy_config);
747 self
748 }
749
750 #[cfg(feature = "smtp")]
751 pub fn with_smtp_registry(
753 mut self,
754 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
755 ) -> Self {
756 self.smtp_registry = Some(smtp_registry);
757 self
758 }
759
760 #[cfg(feature = "mqtt")]
761 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
763 self.mqtt_broker = Some(mqtt_broker);
764 self
765 }
766
767 #[cfg(feature = "kafka")]
768 pub fn with_kafka_broker(
770 mut self,
771 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
772 ) -> Self {
773 self.kafka_broker = Some(kafka_broker);
774 self
775 }
776
777 #[cfg(feature = "chaos")]
778 pub fn with_chaos_api_state(
780 mut self,
781 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
782 ) -> Self {
783 self.chaos_api_state = Some(chaos_api_state);
784 self
785 }
786
787 pub fn with_server_config(
789 mut self,
790 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
791 ) -> Self {
792 self.server_config = Some(server_config);
793 self
794 }
795}
796
797async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
799 let mocks = state.mocks.read().await;
800 Json(serde_json::json!({
801 "mocks": *mocks,
802 "total": mocks.len(),
803 "enabled": mocks.iter().filter(|m| m.enabled).count()
804 }))
805}
806
807async fn get_mock(
809 State(state): State<ManagementState>,
810 Path(id): Path<String>,
811) -> Result<Json<MockConfig>, StatusCode> {
812 let mocks = state.mocks.read().await;
813 mocks
814 .iter()
815 .find(|m| m.id == id)
816 .cloned()
817 .map(Json)
818 .ok_or(StatusCode::NOT_FOUND)
819}
820
821async fn create_mock(
823 State(state): State<ManagementState>,
824 Json(mut mock): Json<MockConfig>,
825) -> Result<Json<MockConfig>, StatusCode> {
826 let mut mocks = state.mocks.write().await;
827
828 if mock.id.is_empty() {
830 mock.id = uuid::Uuid::new_v4().to_string();
831 }
832
833 if mocks.iter().any(|m| m.id == mock.id) {
835 return Err(StatusCode::CONFLICT);
836 }
837
838 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
839
840 if let Some(hooks) = &state.lifecycle_hooks {
842 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
843 id: mock.id.clone(),
844 name: mock.name.clone(),
845 config: serde_json::to_value(&mock).unwrap_or_default(),
846 };
847 hooks.invoke_mock_created(&event).await;
848 }
849
850 mocks.push(mock.clone());
851
852 if let Some(tx) = &state.ws_broadcast {
854 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
855 }
856
857 Ok(Json(mock))
858}
859
860async fn update_mock(
862 State(state): State<ManagementState>,
863 Path(id): Path<String>,
864 Json(updated_mock): Json<MockConfig>,
865) -> Result<Json<MockConfig>, StatusCode> {
866 let mut mocks = state.mocks.write().await;
867
868 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
869
870 let old_mock = mocks[position].clone();
872
873 info!("Updating mock: {}", id);
874 mocks[position] = updated_mock.clone();
875
876 if let Some(hooks) = &state.lifecycle_hooks {
878 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
879 id: updated_mock.id.clone(),
880 name: updated_mock.name.clone(),
881 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
882 };
883 hooks.invoke_mock_updated(&event).await;
884
885 if old_mock.enabled != updated_mock.enabled {
887 let state_event = if updated_mock.enabled {
888 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
889 id: updated_mock.id.clone(),
890 }
891 } else {
892 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
893 id: updated_mock.id.clone(),
894 }
895 };
896 hooks.invoke_mock_state_changed(&state_event).await;
897 }
898 }
899
900 if let Some(tx) = &state.ws_broadcast {
902 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
903 }
904
905 Ok(Json(updated_mock))
906}
907
908async fn delete_mock(
910 State(state): State<ManagementState>,
911 Path(id): Path<String>,
912) -> Result<StatusCode, StatusCode> {
913 let mut mocks = state.mocks.write().await;
914
915 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
916
917 let deleted_mock = mocks[position].clone();
919
920 info!("Deleting mock: {}", id);
921 mocks.remove(position);
922
923 if let Some(hooks) = &state.lifecycle_hooks {
925 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
926 id: deleted_mock.id.clone(),
927 name: deleted_mock.name.clone(),
928 };
929 hooks.invoke_mock_deleted(&event).await;
930 }
931
932 if let Some(tx) = &state.ws_broadcast {
934 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
935 }
936
937 Ok(StatusCode::NO_CONTENT)
938}
939
940#[derive(Debug, Deserialize)]
942pub struct ValidateConfigRequest {
943 pub config: serde_json::Value,
945 #[serde(default = "default_format")]
947 pub format: String,
948}
949
950fn default_format() -> String {
951 "json".to_string()
952}
953
954async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
956 use mockforge_core::config::ServerConfig;
957
958 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
959 "yaml" | "yml" => {
960 let yaml_str = match serde_json::to_string(&request.config) {
961 Ok(s) => s,
962 Err(e) => {
963 return (
964 StatusCode::BAD_REQUEST,
965 Json(serde_json::json!({
966 "valid": false,
967 "error": format!("Failed to convert to string: {}", e),
968 "message": "Configuration validation failed"
969 })),
970 )
971 .into_response();
972 }
973 };
974 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
975 }
976 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
977 };
978
979 match config_result {
980 Ok(_) => Json(serde_json::json!({
981 "valid": true,
982 "message": "Configuration is valid"
983 }))
984 .into_response(),
985 Err(e) => (
986 StatusCode::BAD_REQUEST,
987 Json(serde_json::json!({
988 "valid": false,
989 "error": format!("Invalid configuration: {}", e),
990 "message": "Configuration validation failed"
991 })),
992 )
993 .into_response(),
994 }
995}
996
997#[derive(Debug, Deserialize)]
999pub struct BulkConfigUpdateRequest {
1000 pub updates: serde_json::Value,
1002}
1003
1004async fn bulk_update_config(
1012 State(state): State<ManagementState>,
1013 Json(request): Json<BulkConfigUpdateRequest>,
1014) -> impl IntoResponse {
1015 if !request.updates.is_object() {
1017 return (
1018 StatusCode::BAD_REQUEST,
1019 Json(serde_json::json!({
1020 "error": "Invalid request",
1021 "message": "Updates must be a JSON object"
1022 })),
1023 )
1024 .into_response();
1025 }
1026
1027 use mockforge_core::config::ServerConfig;
1029
1030 let base_config = ServerConfig::default();
1032 let base_json = match serde_json::to_value(&base_config) {
1033 Ok(v) => v,
1034 Err(e) => {
1035 return (
1036 StatusCode::INTERNAL_SERVER_ERROR,
1037 Json(serde_json::json!({
1038 "error": "Internal error",
1039 "message": format!("Failed to serialize base config: {}", e)
1040 })),
1041 )
1042 .into_response();
1043 }
1044 };
1045
1046 let mut merged = base_json.clone();
1048 if let (Some(merged_obj), Some(updates_obj)) =
1049 (merged.as_object_mut(), request.updates.as_object())
1050 {
1051 for (key, value) in updates_obj {
1052 merged_obj.insert(key.clone(), value.clone());
1053 }
1054 }
1055
1056 match serde_json::from_value::<ServerConfig>(merged) {
1058 Ok(validated_config) => {
1059 if let Some(ref config_lock) = state.server_config {
1061 let mut config = config_lock.write().await;
1062 *config = validated_config;
1063 Json(serde_json::json!({
1064 "success": true,
1065 "message": "Bulk configuration update applied successfully",
1066 "updates_received": request.updates,
1067 "validated": true,
1068 "applied": true
1069 }))
1070 .into_response()
1071 } else {
1072 Json(serde_json::json!({
1073 "success": true,
1074 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1075 "updates_received": request.updates,
1076 "validated": true,
1077 "applied": false
1078 }))
1079 .into_response()
1080 }
1081 }
1082 Err(e) => (
1083 StatusCode::BAD_REQUEST,
1084 Json(serde_json::json!({
1085 "error": "Invalid configuration",
1086 "message": format!("Configuration validation failed: {}", e),
1087 "validated": false
1088 })),
1089 )
1090 .into_response(),
1091 }
1092}
1093
1094async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1096 let mocks = state.mocks.read().await;
1097 let request_count = *state.request_counter.read().await;
1098
1099 Json(ServerStats {
1100 uptime_seconds: state.start_time.elapsed().as_secs(),
1101 total_requests: request_count,
1102 active_mocks: mocks.len(),
1103 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1104 registered_routes: mocks.len(), })
1106}
1107
1108async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1110 Json(ServerConfig {
1111 version: env!("CARGO_PKG_VERSION").to_string(),
1112 port: state.port,
1113 has_openapi_spec: state.spec.is_some(),
1114 spec_path: state.spec_path.clone(),
1115 })
1116}
1117
1118async fn health_check() -> Json<serde_json::Value> {
1120 Json(serde_json::json!({
1121 "status": "healthy",
1122 "service": "mockforge-management",
1123 "timestamp": chrono::Utc::now().to_rfc3339()
1124 }))
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1129#[serde(rename_all = "lowercase")]
1130pub enum ExportFormat {
1131 Json,
1133 Yaml,
1135}
1136
1137async fn export_mocks(
1139 State(state): State<ManagementState>,
1140 Query(params): Query<std::collections::HashMap<String, String>>,
1141) -> Result<(StatusCode, String), StatusCode> {
1142 let mocks = state.mocks.read().await;
1143
1144 let format = params
1145 .get("format")
1146 .map(|f| match f.as_str() {
1147 "yaml" | "yml" => ExportFormat::Yaml,
1148 _ => ExportFormat::Json,
1149 })
1150 .unwrap_or(ExportFormat::Json);
1151
1152 match format {
1153 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1154 .map(|json| (StatusCode::OK, json))
1155 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1156 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1157 .map(|yaml| (StatusCode::OK, yaml))
1158 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1159 }
1160}
1161
1162async fn import_mocks(
1164 State(state): State<ManagementState>,
1165 Json(mocks): Json<Vec<MockConfig>>,
1166) -> impl IntoResponse {
1167 let mut current_mocks = state.mocks.write().await;
1168 current_mocks.clear();
1169 current_mocks.extend(mocks);
1170 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1171}
1172
1173#[cfg(feature = "smtp")]
1174async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1176 if let Some(ref smtp_registry) = state.smtp_registry {
1177 match smtp_registry.get_emails() {
1178 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1179 Err(e) => (
1180 StatusCode::INTERNAL_SERVER_ERROR,
1181 Json(serde_json::json!({
1182 "error": "Failed to retrieve emails",
1183 "message": e.to_string()
1184 })),
1185 ),
1186 }
1187 } else {
1188 (
1189 StatusCode::NOT_IMPLEMENTED,
1190 Json(serde_json::json!({
1191 "error": "SMTP mailbox management not available",
1192 "message": "SMTP server is not enabled or registry not available."
1193 })),
1194 )
1195 }
1196}
1197
1198#[cfg(feature = "smtp")]
1200async fn get_smtp_email(
1201 State(state): State<ManagementState>,
1202 Path(id): Path<String>,
1203) -> impl IntoResponse {
1204 if let Some(ref smtp_registry) = state.smtp_registry {
1205 match smtp_registry.get_email_by_id(&id) {
1206 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1207 Ok(None) => (
1208 StatusCode::NOT_FOUND,
1209 Json(serde_json::json!({
1210 "error": "Email not found",
1211 "id": id
1212 })),
1213 ),
1214 Err(e) => (
1215 StatusCode::INTERNAL_SERVER_ERROR,
1216 Json(serde_json::json!({
1217 "error": "Failed to retrieve email",
1218 "message": e.to_string()
1219 })),
1220 ),
1221 }
1222 } else {
1223 (
1224 StatusCode::NOT_IMPLEMENTED,
1225 Json(serde_json::json!({
1226 "error": "SMTP mailbox management not available",
1227 "message": "SMTP server is not enabled or registry not available."
1228 })),
1229 )
1230 }
1231}
1232
1233#[cfg(feature = "smtp")]
1235async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1236 if let Some(ref smtp_registry) = state.smtp_registry {
1237 match smtp_registry.clear_mailbox() {
1238 Ok(()) => (
1239 StatusCode::OK,
1240 Json(serde_json::json!({
1241 "message": "Mailbox cleared successfully"
1242 })),
1243 ),
1244 Err(e) => (
1245 StatusCode::INTERNAL_SERVER_ERROR,
1246 Json(serde_json::json!({
1247 "error": "Failed to clear mailbox",
1248 "message": e.to_string()
1249 })),
1250 ),
1251 }
1252 } else {
1253 (
1254 StatusCode::NOT_IMPLEMENTED,
1255 Json(serde_json::json!({
1256 "error": "SMTP mailbox management not available",
1257 "message": "SMTP server is not enabled or registry not available."
1258 })),
1259 )
1260 }
1261}
1262
1263#[cfg(feature = "smtp")]
1265async fn export_smtp_mailbox(
1266 Query(params): Query<std::collections::HashMap<String, String>>,
1267) -> impl IntoResponse {
1268 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1269 (
1270 StatusCode::NOT_IMPLEMENTED,
1271 Json(serde_json::json!({
1272 "error": "SMTP mailbox management not available via HTTP API",
1273 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1274 "requested_format": format
1275 })),
1276 )
1277}
1278
1279#[cfg(feature = "smtp")]
1281async fn search_smtp_emails(
1282 State(state): State<ManagementState>,
1283 Query(params): Query<std::collections::HashMap<String, String>>,
1284) -> impl IntoResponse {
1285 if let Some(ref smtp_registry) = state.smtp_registry {
1286 let filters = EmailSearchFilters {
1287 sender: params.get("sender").cloned(),
1288 recipient: params.get("recipient").cloned(),
1289 subject: params.get("subject").cloned(),
1290 body: params.get("body").cloned(),
1291 since: params
1292 .get("since")
1293 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1294 .map(|dt| dt.with_timezone(&chrono::Utc)),
1295 until: params
1296 .get("until")
1297 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1298 .map(|dt| dt.with_timezone(&chrono::Utc)),
1299 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1300 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1301 };
1302
1303 match smtp_registry.search_emails(filters) {
1304 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1305 Err(e) => (
1306 StatusCode::INTERNAL_SERVER_ERROR,
1307 Json(serde_json::json!({
1308 "error": "Failed to search emails",
1309 "message": e.to_string()
1310 })),
1311 ),
1312 }
1313 } else {
1314 (
1315 StatusCode::NOT_IMPLEMENTED,
1316 Json(serde_json::json!({
1317 "error": "SMTP mailbox management not available",
1318 "message": "SMTP server is not enabled or registry not available."
1319 })),
1320 )
1321 }
1322}
1323
1324#[cfg(feature = "mqtt")]
1326#[derive(Debug, Clone, Serialize, Deserialize)]
1327pub struct MqttBrokerStats {
1328 pub connected_clients: usize,
1330 pub active_topics: usize,
1332 pub retained_messages: usize,
1334 pub total_subscriptions: usize,
1336}
1337
1338#[cfg(feature = "mqtt")]
1340async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1341 if let Some(broker) = &state.mqtt_broker {
1342 let connected_clients = broker.get_connected_clients().await.len();
1343 let active_topics = broker.get_active_topics().await.len();
1344 let stats = broker.get_topic_stats().await;
1345
1346 let broker_stats = MqttBrokerStats {
1347 connected_clients,
1348 active_topics,
1349 retained_messages: stats.retained_messages,
1350 total_subscriptions: stats.total_subscriptions,
1351 };
1352
1353 Json(broker_stats).into_response()
1354 } else {
1355 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1356 }
1357}
1358
1359#[cfg(feature = "mqtt")]
1360async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1361 if let Some(broker) = &state.mqtt_broker {
1362 let clients = broker.get_connected_clients().await;
1363 Json(serde_json::json!({
1364 "clients": clients
1365 }))
1366 .into_response()
1367 } else {
1368 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1369 }
1370}
1371
1372#[cfg(feature = "mqtt")]
1373async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1374 if let Some(broker) = &state.mqtt_broker {
1375 let topics = broker.get_active_topics().await;
1376 Json(serde_json::json!({
1377 "topics": topics
1378 }))
1379 .into_response()
1380 } else {
1381 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1382 }
1383}
1384
1385#[cfg(feature = "mqtt")]
1386async fn disconnect_mqtt_client(
1387 State(state): State<ManagementState>,
1388 Path(client_id): Path<String>,
1389) -> impl IntoResponse {
1390 if let Some(broker) = &state.mqtt_broker {
1391 match broker.disconnect_client(&client_id).await {
1392 Ok(_) => {
1393 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1394 }
1395 Err(e) => {
1396 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1397 .into_response()
1398 }
1399 }
1400 } else {
1401 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1402 }
1403}
1404
1405#[cfg(feature = "mqtt")]
1408#[derive(Debug, Deserialize)]
1410pub struct MqttPublishRequest {
1411 pub topic: String,
1413 pub payload: String,
1415 #[serde(default = "default_qos")]
1417 pub qos: u8,
1418 #[serde(default)]
1420 pub retain: bool,
1421}
1422
1423#[cfg(feature = "mqtt")]
1424fn default_qos() -> u8 {
1425 0
1426}
1427
1428#[cfg(feature = "mqtt")]
1429async fn publish_mqtt_message_handler(
1431 State(state): State<ManagementState>,
1432 Json(request): Json<serde_json::Value>,
1433) -> impl IntoResponse {
1434 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1436 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1437 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1438 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1439
1440 if topic.is_none() || payload.is_none() {
1441 return (
1442 StatusCode::BAD_REQUEST,
1443 Json(serde_json::json!({
1444 "error": "Invalid request",
1445 "message": "Missing required fields: topic and payload"
1446 })),
1447 );
1448 }
1449
1450 let topic = topic.unwrap();
1451 let payload = payload.unwrap();
1452
1453 if let Some(broker) = &state.mqtt_broker {
1454 if qos > 2 {
1456 return (
1457 StatusCode::BAD_REQUEST,
1458 Json(serde_json::json!({
1459 "error": "Invalid QoS",
1460 "message": "QoS must be 0, 1, or 2"
1461 })),
1462 );
1463 }
1464
1465 let payload_bytes = payload.as_bytes().to_vec();
1467 let client_id = "mockforge-management-api".to_string();
1468
1469 let publish_result = broker
1470 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1471 .await
1472 .map_err(|e| format!("{}", e));
1473
1474 match publish_result {
1475 Ok(_) => {
1476 let event = MessageEvent::Mqtt(MqttMessageEvent {
1478 topic: topic.clone(),
1479 payload: payload.clone(),
1480 qos,
1481 retain,
1482 timestamp: chrono::Utc::now().to_rfc3339(),
1483 });
1484 let _ = state.message_events.send(event);
1485
1486 (
1487 StatusCode::OK,
1488 Json(serde_json::json!({
1489 "success": true,
1490 "message": format!("Message published to topic '{}'", topic),
1491 "topic": topic,
1492 "qos": qos,
1493 "retain": retain
1494 })),
1495 )
1496 }
1497 Err(error_msg) => (
1498 StatusCode::INTERNAL_SERVER_ERROR,
1499 Json(serde_json::json!({
1500 "error": "Failed to publish message",
1501 "message": error_msg
1502 })),
1503 ),
1504 }
1505 } else {
1506 (
1507 StatusCode::SERVICE_UNAVAILABLE,
1508 Json(serde_json::json!({
1509 "error": "MQTT broker not available",
1510 "message": "MQTT broker is not enabled or not available."
1511 })),
1512 )
1513 }
1514}
1515
1516#[cfg(not(feature = "mqtt"))]
1517async fn publish_mqtt_message_handler(
1519 State(_state): State<ManagementState>,
1520 Json(_request): Json<serde_json::Value>,
1521) -> impl IntoResponse {
1522 (
1523 StatusCode::SERVICE_UNAVAILABLE,
1524 Json(serde_json::json!({
1525 "error": "MQTT feature not enabled",
1526 "message": "MQTT support is not compiled into this build"
1527 })),
1528 )
1529}
1530
1531#[cfg(feature = "mqtt")]
1532#[derive(Debug, Deserialize)]
1534pub struct MqttBatchPublishRequest {
1535 pub messages: Vec<MqttPublishRequest>,
1537 #[serde(default = "default_delay")]
1539 pub delay_ms: u64,
1540}
1541
1542#[cfg(feature = "mqtt")]
1543fn default_delay() -> u64 {
1544 100
1545}
1546
1547#[cfg(feature = "mqtt")]
1548async fn publish_mqtt_batch_handler(
1550 State(state): State<ManagementState>,
1551 Json(request): Json<serde_json::Value>,
1552) -> impl IntoResponse {
1553 let messages_json = request.get("messages").and_then(|v| v.as_array());
1555 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1556
1557 if messages_json.is_none() {
1558 return (
1559 StatusCode::BAD_REQUEST,
1560 Json(serde_json::json!({
1561 "error": "Invalid request",
1562 "message": "Missing required field: messages"
1563 })),
1564 );
1565 }
1566
1567 let messages_json = messages_json.unwrap();
1568
1569 if let Some(broker) = &state.mqtt_broker {
1570 if messages_json.is_empty() {
1571 return (
1572 StatusCode::BAD_REQUEST,
1573 Json(serde_json::json!({
1574 "error": "Empty batch",
1575 "message": "At least one message is required"
1576 })),
1577 );
1578 }
1579
1580 let mut results = Vec::new();
1581 let client_id = "mockforge-management-api".to_string();
1582
1583 for (index, msg_json) in messages_json.iter().enumerate() {
1584 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1585 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1586 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1587 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1588
1589 if topic.is_none() || payload.is_none() {
1590 results.push(serde_json::json!({
1591 "index": index,
1592 "success": false,
1593 "error": "Missing required fields: topic and payload"
1594 }));
1595 continue;
1596 }
1597
1598 let topic = topic.unwrap();
1599 let payload = payload.unwrap();
1600
1601 if qos > 2 {
1603 results.push(serde_json::json!({
1604 "index": index,
1605 "success": false,
1606 "error": "Invalid QoS (must be 0, 1, or 2)"
1607 }));
1608 continue;
1609 }
1610
1611 let payload_bytes = payload.as_bytes().to_vec();
1613
1614 let publish_result = broker
1615 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1616 .await
1617 .map_err(|e| format!("{}", e));
1618
1619 match publish_result {
1620 Ok(_) => {
1621 let event = MessageEvent::Mqtt(MqttMessageEvent {
1623 topic: topic.clone(),
1624 payload: payload.clone(),
1625 qos,
1626 retain,
1627 timestamp: chrono::Utc::now().to_rfc3339(),
1628 });
1629 let _ = state.message_events.send(event);
1630
1631 results.push(serde_json::json!({
1632 "index": index,
1633 "success": true,
1634 "topic": topic,
1635 "qos": qos
1636 }));
1637 }
1638 Err(error_msg) => {
1639 results.push(serde_json::json!({
1640 "index": index,
1641 "success": false,
1642 "error": error_msg
1643 }));
1644 }
1645 }
1646
1647 if index < messages_json.len() - 1 && delay_ms > 0 {
1649 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1650 }
1651 }
1652
1653 let success_count =
1654 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1655
1656 (
1657 StatusCode::OK,
1658 Json(serde_json::json!({
1659 "success": true,
1660 "total": messages_json.len(),
1661 "succeeded": success_count,
1662 "failed": messages_json.len() - success_count,
1663 "results": results
1664 })),
1665 )
1666 } else {
1667 (
1668 StatusCode::SERVICE_UNAVAILABLE,
1669 Json(serde_json::json!({
1670 "error": "MQTT broker not available",
1671 "message": "MQTT broker is not enabled or not available."
1672 })),
1673 )
1674 }
1675}
1676
1677#[cfg(not(feature = "mqtt"))]
1678async fn publish_mqtt_batch_handler(
1680 State(_state): State<ManagementState>,
1681 Json(_request): Json<serde_json::Value>,
1682) -> impl IntoResponse {
1683 (
1684 StatusCode::SERVICE_UNAVAILABLE,
1685 Json(serde_json::json!({
1686 "error": "MQTT feature not enabled",
1687 "message": "MQTT support is not compiled into this build"
1688 })),
1689 )
1690}
1691
1692#[derive(Debug, Deserialize)]
1696struct SetMigrationModeRequest {
1697 mode: String,
1698}
1699
1700async fn get_migration_routes(
1702 State(state): State<ManagementState>,
1703) -> Result<Json<serde_json::Value>, StatusCode> {
1704 let proxy_config = match &state.proxy_config {
1705 Some(config) => config,
1706 None => {
1707 return Ok(Json(serde_json::json!({
1708 "error": "Migration not configured. Proxy config not available."
1709 })));
1710 }
1711 };
1712
1713 let config = proxy_config.read().await;
1714 let routes = config.get_migration_routes();
1715
1716 Ok(Json(serde_json::json!({
1717 "routes": routes
1718 })))
1719}
1720
1721async fn toggle_route_migration(
1723 State(state): State<ManagementState>,
1724 Path(pattern): Path<String>,
1725) -> Result<Json<serde_json::Value>, StatusCode> {
1726 let proxy_config = match &state.proxy_config {
1727 Some(config) => config,
1728 None => {
1729 return Ok(Json(serde_json::json!({
1730 "error": "Migration not configured. Proxy config not available."
1731 })));
1732 }
1733 };
1734
1735 let mut config = proxy_config.write().await;
1736 let new_mode = match config.toggle_route_migration(&pattern) {
1737 Some(mode) => mode,
1738 None => {
1739 return Ok(Json(serde_json::json!({
1740 "error": format!("Route pattern not found: {}", pattern)
1741 })));
1742 }
1743 };
1744
1745 Ok(Json(serde_json::json!({
1746 "pattern": pattern,
1747 "mode": format!("{:?}", new_mode).to_lowercase()
1748 })))
1749}
1750
1751async fn set_route_migration_mode(
1753 State(state): State<ManagementState>,
1754 Path(pattern): Path<String>,
1755 Json(request): Json<SetMigrationModeRequest>,
1756) -> Result<Json<serde_json::Value>, StatusCode> {
1757 let proxy_config = match &state.proxy_config {
1758 Some(config) => config,
1759 None => {
1760 return Ok(Json(serde_json::json!({
1761 "error": "Migration not configured. Proxy config not available."
1762 })));
1763 }
1764 };
1765
1766 use mockforge_core::proxy::config::MigrationMode;
1767 let mode = match request.mode.to_lowercase().as_str() {
1768 "mock" => MigrationMode::Mock,
1769 "shadow" => MigrationMode::Shadow,
1770 "real" => MigrationMode::Real,
1771 "auto" => MigrationMode::Auto,
1772 _ => {
1773 return Ok(Json(serde_json::json!({
1774 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1775 })));
1776 }
1777 };
1778
1779 let mut config = proxy_config.write().await;
1780 let updated = config.update_rule_migration_mode(&pattern, mode);
1781
1782 if !updated {
1783 return Ok(Json(serde_json::json!({
1784 "error": format!("Route pattern not found: {}", pattern)
1785 })));
1786 }
1787
1788 Ok(Json(serde_json::json!({
1789 "pattern": pattern,
1790 "mode": format!("{:?}", mode).to_lowercase()
1791 })))
1792}
1793
1794async fn toggle_group_migration(
1796 State(state): State<ManagementState>,
1797 Path(group): Path<String>,
1798) -> Result<Json<serde_json::Value>, StatusCode> {
1799 let proxy_config = match &state.proxy_config {
1800 Some(config) => config,
1801 None => {
1802 return Ok(Json(serde_json::json!({
1803 "error": "Migration not configured. Proxy config not available."
1804 })));
1805 }
1806 };
1807
1808 let mut config = proxy_config.write().await;
1809 let new_mode = config.toggle_group_migration(&group);
1810
1811 Ok(Json(serde_json::json!({
1812 "group": group,
1813 "mode": format!("{:?}", new_mode).to_lowercase()
1814 })))
1815}
1816
1817async fn set_group_migration_mode(
1819 State(state): State<ManagementState>,
1820 Path(group): Path<String>,
1821 Json(request): Json<SetMigrationModeRequest>,
1822) -> Result<Json<serde_json::Value>, StatusCode> {
1823 let proxy_config = match &state.proxy_config {
1824 Some(config) => config,
1825 None => {
1826 return Ok(Json(serde_json::json!({
1827 "error": "Migration not configured. Proxy config not available."
1828 })));
1829 }
1830 };
1831
1832 use mockforge_core::proxy::config::MigrationMode;
1833 let mode = match request.mode.to_lowercase().as_str() {
1834 "mock" => MigrationMode::Mock,
1835 "shadow" => MigrationMode::Shadow,
1836 "real" => MigrationMode::Real,
1837 "auto" => MigrationMode::Auto,
1838 _ => {
1839 return Ok(Json(serde_json::json!({
1840 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1841 })));
1842 }
1843 };
1844
1845 let mut config = proxy_config.write().await;
1846 config.update_group_migration_mode(&group, mode);
1847
1848 Ok(Json(serde_json::json!({
1849 "group": group,
1850 "mode": format!("{:?}", mode).to_lowercase()
1851 })))
1852}
1853
1854async fn get_migration_groups(
1856 State(state): State<ManagementState>,
1857) -> Result<Json<serde_json::Value>, StatusCode> {
1858 let proxy_config = match &state.proxy_config {
1859 Some(config) => config,
1860 None => {
1861 return Ok(Json(serde_json::json!({
1862 "error": "Migration not configured. Proxy config not available."
1863 })));
1864 }
1865 };
1866
1867 let config = proxy_config.read().await;
1868 let groups = config.get_migration_groups();
1869
1870 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1872 .into_iter()
1873 .map(|(name, info)| {
1874 (
1875 name,
1876 serde_json::json!({
1877 "name": info.name,
1878 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1879 "route_count": info.route_count
1880 }),
1881 )
1882 })
1883 .collect();
1884
1885 Ok(Json(serde_json::json!(groups_json)))
1886}
1887
1888async fn get_migration_status(
1890 State(state): State<ManagementState>,
1891) -> Result<Json<serde_json::Value>, StatusCode> {
1892 let proxy_config = match &state.proxy_config {
1893 Some(config) => config,
1894 None => {
1895 return Ok(Json(serde_json::json!({
1896 "error": "Migration not configured. Proxy config not available."
1897 })));
1898 }
1899 };
1900
1901 let config = proxy_config.read().await;
1902 let routes = config.get_migration_routes();
1903 let groups = config.get_migration_groups();
1904
1905 let mut mock_count = 0;
1906 let mut shadow_count = 0;
1907 let mut real_count = 0;
1908 let mut auto_count = 0;
1909
1910 for route in &routes {
1911 match route.migration_mode {
1912 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1913 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1914 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1915 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1916 }
1917 }
1918
1919 Ok(Json(serde_json::json!({
1920 "total_routes": routes.len(),
1921 "mock_routes": mock_count,
1922 "shadow_routes": shadow_count,
1923 "real_routes": real_count,
1924 "auto_routes": auto_count,
1925 "total_groups": groups.len(),
1926 "migration_enabled": config.migration_enabled
1927 })))
1928}
1929
1930#[derive(Debug, Deserialize, Serialize)]
1934pub struct ProxyRuleRequest {
1935 pub pattern: String,
1937 #[serde(rename = "type")]
1939 pub rule_type: String,
1940 #[serde(default)]
1942 pub status_codes: Vec<u16>,
1943 pub body_transforms: Vec<BodyTransformRequest>,
1945 #[serde(default = "default_true")]
1947 pub enabled: bool,
1948}
1949
1950#[derive(Debug, Deserialize, Serialize)]
1952pub struct BodyTransformRequest {
1953 pub path: String,
1955 pub replace: String,
1957 #[serde(default)]
1959 pub operation: String,
1960}
1961
1962#[derive(Debug, Serialize)]
1964pub struct ProxyRuleResponse {
1965 pub id: usize,
1967 pub pattern: String,
1969 #[serde(rename = "type")]
1971 pub rule_type: String,
1972 pub status_codes: Vec<u16>,
1974 pub body_transforms: Vec<BodyTransformRequest>,
1976 pub enabled: bool,
1978}
1979
1980async fn list_proxy_rules(
1982 State(state): State<ManagementState>,
1983) -> Result<Json<serde_json::Value>, StatusCode> {
1984 let proxy_config = match &state.proxy_config {
1985 Some(config) => config,
1986 None => {
1987 return Ok(Json(serde_json::json!({
1988 "error": "Proxy not configured. Proxy config not available."
1989 })));
1990 }
1991 };
1992
1993 let config = proxy_config.read().await;
1994
1995 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1996
1997 for (idx, rule) in config.request_replacements.iter().enumerate() {
1999 rules.push(ProxyRuleResponse {
2000 id: idx,
2001 pattern: rule.pattern.clone(),
2002 rule_type: "request".to_string(),
2003 status_codes: Vec::new(),
2004 body_transforms: rule
2005 .body_transforms
2006 .iter()
2007 .map(|t| BodyTransformRequest {
2008 path: t.path.clone(),
2009 replace: t.replace.clone(),
2010 operation: format!("{:?}", t.operation).to_lowercase(),
2011 })
2012 .collect(),
2013 enabled: rule.enabled,
2014 });
2015 }
2016
2017 let request_count = config.request_replacements.len();
2019 for (idx, rule) in config.response_replacements.iter().enumerate() {
2020 rules.push(ProxyRuleResponse {
2021 id: request_count + idx,
2022 pattern: rule.pattern.clone(),
2023 rule_type: "response".to_string(),
2024 status_codes: rule.status_codes.clone(),
2025 body_transforms: rule
2026 .body_transforms
2027 .iter()
2028 .map(|t| BodyTransformRequest {
2029 path: t.path.clone(),
2030 replace: t.replace.clone(),
2031 operation: format!("{:?}", t.operation).to_lowercase(),
2032 })
2033 .collect(),
2034 enabled: rule.enabled,
2035 });
2036 }
2037
2038 Ok(Json(serde_json::json!({
2039 "rules": rules
2040 })))
2041}
2042
2043async fn create_proxy_rule(
2045 State(state): State<ManagementState>,
2046 Json(request): Json<ProxyRuleRequest>,
2047) -> Result<Json<serde_json::Value>, StatusCode> {
2048 let proxy_config = match &state.proxy_config {
2049 Some(config) => config,
2050 None => {
2051 return Ok(Json(serde_json::json!({
2052 "error": "Proxy not configured. Proxy config not available."
2053 })));
2054 }
2055 };
2056
2057 if request.body_transforms.is_empty() {
2059 return Ok(Json(serde_json::json!({
2060 "error": "At least one body transform is required"
2061 })));
2062 }
2063
2064 let body_transforms: Vec<BodyTransform> = request
2065 .body_transforms
2066 .iter()
2067 .map(|t| {
2068 let op = match t.operation.as_str() {
2069 "replace" => TransformOperation::Replace,
2070 "add" => TransformOperation::Add,
2071 "remove" => TransformOperation::Remove,
2072 _ => TransformOperation::Replace,
2073 };
2074 BodyTransform {
2075 path: t.path.clone(),
2076 replace: t.replace.clone(),
2077 operation: op,
2078 }
2079 })
2080 .collect();
2081
2082 let new_rule = BodyTransformRule {
2083 pattern: request.pattern.clone(),
2084 status_codes: request.status_codes.clone(),
2085 body_transforms,
2086 enabled: request.enabled,
2087 };
2088
2089 let mut config = proxy_config.write().await;
2090
2091 let rule_id = if request.rule_type == "request" {
2092 config.request_replacements.push(new_rule);
2093 config.request_replacements.len() - 1
2094 } else if request.rule_type == "response" {
2095 config.response_replacements.push(new_rule);
2096 config.request_replacements.len() + config.response_replacements.len() - 1
2097 } else {
2098 return Ok(Json(serde_json::json!({
2099 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2100 })));
2101 };
2102
2103 Ok(Json(serde_json::json!({
2104 "id": rule_id,
2105 "message": "Rule created successfully"
2106 })))
2107}
2108
2109async fn get_proxy_rule(
2111 State(state): State<ManagementState>,
2112 Path(id): Path<String>,
2113) -> Result<Json<serde_json::Value>, StatusCode> {
2114 let proxy_config = match &state.proxy_config {
2115 Some(config) => config,
2116 None => {
2117 return Ok(Json(serde_json::json!({
2118 "error": "Proxy not configured. Proxy config not available."
2119 })));
2120 }
2121 };
2122
2123 let config = proxy_config.read().await;
2124 let rule_id: usize = match id.parse() {
2125 Ok(id) => id,
2126 Err(_) => {
2127 return Ok(Json(serde_json::json!({
2128 "error": format!("Invalid rule ID: {}", id)
2129 })));
2130 }
2131 };
2132
2133 let request_count = config.request_replacements.len();
2134
2135 if rule_id < request_count {
2136 let rule = &config.request_replacements[rule_id];
2138 Ok(Json(serde_json::json!({
2139 "id": rule_id,
2140 "pattern": rule.pattern,
2141 "type": "request",
2142 "status_codes": [],
2143 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2144 "path": t.path,
2145 "replace": t.replace,
2146 "operation": format!("{:?}", t.operation).to_lowercase()
2147 })).collect::<Vec<_>>(),
2148 "enabled": rule.enabled
2149 })))
2150 } else if rule_id < request_count + config.response_replacements.len() {
2151 let response_idx = rule_id - request_count;
2153 let rule = &config.response_replacements[response_idx];
2154 Ok(Json(serde_json::json!({
2155 "id": rule_id,
2156 "pattern": rule.pattern,
2157 "type": "response",
2158 "status_codes": rule.status_codes,
2159 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2160 "path": t.path,
2161 "replace": t.replace,
2162 "operation": format!("{:?}", t.operation).to_lowercase()
2163 })).collect::<Vec<_>>(),
2164 "enabled": rule.enabled
2165 })))
2166 } else {
2167 Ok(Json(serde_json::json!({
2168 "error": format!("Rule ID {} not found", rule_id)
2169 })))
2170 }
2171}
2172
2173async fn update_proxy_rule(
2175 State(state): State<ManagementState>,
2176 Path(id): Path<String>,
2177 Json(request): Json<ProxyRuleRequest>,
2178) -> Result<Json<serde_json::Value>, StatusCode> {
2179 let proxy_config = match &state.proxy_config {
2180 Some(config) => config,
2181 None => {
2182 return Ok(Json(serde_json::json!({
2183 "error": "Proxy not configured. Proxy config not available."
2184 })));
2185 }
2186 };
2187
2188 let mut config = proxy_config.write().await;
2189 let rule_id: usize = match id.parse() {
2190 Ok(id) => id,
2191 Err(_) => {
2192 return Ok(Json(serde_json::json!({
2193 "error": format!("Invalid rule ID: {}", id)
2194 })));
2195 }
2196 };
2197
2198 let body_transforms: Vec<BodyTransform> = request
2199 .body_transforms
2200 .iter()
2201 .map(|t| {
2202 let op = match t.operation.as_str() {
2203 "replace" => TransformOperation::Replace,
2204 "add" => TransformOperation::Add,
2205 "remove" => TransformOperation::Remove,
2206 _ => TransformOperation::Replace,
2207 };
2208 BodyTransform {
2209 path: t.path.clone(),
2210 replace: t.replace.clone(),
2211 operation: op,
2212 }
2213 })
2214 .collect();
2215
2216 let updated_rule = BodyTransformRule {
2217 pattern: request.pattern.clone(),
2218 status_codes: request.status_codes.clone(),
2219 body_transforms,
2220 enabled: request.enabled,
2221 };
2222
2223 let request_count = config.request_replacements.len();
2224
2225 if rule_id < request_count {
2226 config.request_replacements[rule_id] = updated_rule;
2228 } else if rule_id < request_count + config.response_replacements.len() {
2229 let response_idx = rule_id - request_count;
2231 config.response_replacements[response_idx] = updated_rule;
2232 } else {
2233 return Ok(Json(serde_json::json!({
2234 "error": format!("Rule ID {} not found", rule_id)
2235 })));
2236 }
2237
2238 Ok(Json(serde_json::json!({
2239 "id": rule_id,
2240 "message": "Rule updated successfully"
2241 })))
2242}
2243
2244async fn delete_proxy_rule(
2246 State(state): State<ManagementState>,
2247 Path(id): Path<String>,
2248) -> Result<Json<serde_json::Value>, StatusCode> {
2249 let proxy_config = match &state.proxy_config {
2250 Some(config) => config,
2251 None => {
2252 return Ok(Json(serde_json::json!({
2253 "error": "Proxy not configured. Proxy config not available."
2254 })));
2255 }
2256 };
2257
2258 let mut config = proxy_config.write().await;
2259 let rule_id: usize = match id.parse() {
2260 Ok(id) => id,
2261 Err(_) => {
2262 return Ok(Json(serde_json::json!({
2263 "error": format!("Invalid rule ID: {}", id)
2264 })));
2265 }
2266 };
2267
2268 let request_count = config.request_replacements.len();
2269
2270 if rule_id < request_count {
2271 config.request_replacements.remove(rule_id);
2273 } else if rule_id < request_count + config.response_replacements.len() {
2274 let response_idx = rule_id - request_count;
2276 config.response_replacements.remove(response_idx);
2277 } else {
2278 return Ok(Json(serde_json::json!({
2279 "error": format!("Rule ID {} not found", rule_id)
2280 })));
2281 }
2282
2283 Ok(Json(serde_json::json!({
2284 "id": rule_id,
2285 "message": "Rule deleted successfully"
2286 })))
2287}
2288
2289async fn get_proxy_inspect(
2291 State(state): State<ManagementState>,
2292 Query(params): Query<std::collections::HashMap<String, String>>,
2293) -> Result<Json<serde_json::Value>, StatusCode> {
2294 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2295 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2296
2297 let proxy_config = match &state.proxy_config {
2298 Some(config) => config.read().await,
2299 None => {
2300 return Ok(Json(serde_json::json!({
2301 "error": "Proxy not configured. Proxy config not available."
2302 })));
2303 }
2304 };
2305
2306 let mut rules = Vec::new();
2307 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2308 rules.push(serde_json::json!({
2309 "id": idx,
2310 "kind": "request",
2311 "pattern": rule.pattern,
2312 "enabled": rule.enabled,
2313 "status_codes": rule.status_codes,
2314 "transform_count": rule.body_transforms.len(),
2315 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2316 "path": t.path,
2317 "operation": t.operation,
2318 "replace": t.replace
2319 })).collect::<Vec<_>>()
2320 }));
2321 }
2322 let request_rule_count = rules.len();
2323 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2324 rules.push(serde_json::json!({
2325 "id": request_rule_count + idx,
2326 "kind": "response",
2327 "pattern": rule.pattern,
2328 "enabled": rule.enabled,
2329 "status_codes": rule.status_codes,
2330 "transform_count": rule.body_transforms.len(),
2331 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2332 "path": t.path,
2333 "operation": t.operation,
2334 "replace": t.replace
2335 })).collect::<Vec<_>>()
2336 }));
2337 }
2338
2339 let total = rules.len();
2340 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2341
2342 Ok(Json(serde_json::json!({
2343 "enabled": proxy_config.enabled,
2344 "target_url": proxy_config.target_url,
2345 "prefix": proxy_config.prefix,
2346 "timeout_seconds": proxy_config.timeout_seconds,
2347 "follow_redirects": proxy_config.follow_redirects,
2348 "passthrough_by_default": proxy_config.passthrough_by_default,
2349 "rules": paged_rules,
2350 "request_rule_count": request_rule_count,
2351 "response_rule_count": total.saturating_sub(request_rule_count),
2352 "limit": limit,
2353 "offset": offset,
2354 "total": total
2355 })))
2356}
2357
2358pub fn management_router(state: ManagementState) -> Router {
2360 let router = Router::new()
2361 .route("/health", get(health_check))
2362 .route("/stats", get(get_stats))
2363 .route("/config", get(get_config))
2364 .route("/config/validate", post(validate_config))
2365 .route("/config/bulk", post(bulk_update_config))
2366 .route("/mocks", get(list_mocks))
2367 .route("/mocks", post(create_mock))
2368 .route("/mocks/{id}", get(get_mock))
2369 .route("/mocks/{id}", put(update_mock))
2370 .route("/mocks/{id}", delete(delete_mock))
2371 .route("/export", get(export_mocks))
2372 .route("/import", post(import_mocks));
2373
2374 #[cfg(feature = "smtp")]
2375 let router = router
2376 .route("/smtp/mailbox", get(list_smtp_emails))
2377 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2378 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2379 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2380 .route("/smtp/mailbox/search", get(search_smtp_emails));
2381
2382 #[cfg(not(feature = "smtp"))]
2383 let router = router;
2384
2385 #[cfg(feature = "mqtt")]
2387 let router = router
2388 .route("/mqtt/stats", get(get_mqtt_stats))
2389 .route("/mqtt/clients", get(get_mqtt_clients))
2390 .route("/mqtt/topics", get(get_mqtt_topics))
2391 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2392 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2393 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2394 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2395
2396 #[cfg(not(feature = "mqtt"))]
2397 let router = router
2398 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2399 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2400
2401 #[cfg(feature = "kafka")]
2402 let router = router
2403 .route("/kafka/stats", get(get_kafka_stats))
2404 .route("/kafka/topics", get(get_kafka_topics))
2405 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2406 .route("/kafka/groups", get(get_kafka_groups))
2407 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2408 .route("/kafka/produce", post(produce_kafka_message))
2409 .route("/kafka/produce/batch", post(produce_kafka_batch))
2410 .route("/kafka/messages/stream", get(kafka_messages_stream));
2411
2412 #[cfg(not(feature = "kafka"))]
2413 let router = router;
2414
2415 let router = router
2417 .route("/migration/routes", get(get_migration_routes))
2418 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2419 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2420 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2421 .route("/migration/groups/{group}", put(set_group_migration_mode))
2422 .route("/migration/groups", get(get_migration_groups))
2423 .route("/migration/status", get(get_migration_status));
2424
2425 let router = router
2427 .route("/proxy/rules", get(list_proxy_rules))
2428 .route("/proxy/rules", post(create_proxy_rule))
2429 .route("/proxy/rules/{id}", get(get_proxy_rule))
2430 .route("/proxy/rules/{id}", put(update_proxy_rule))
2431 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2432 .route("/proxy/inspect", get(get_proxy_inspect));
2433
2434 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2436
2437 let router = router.nest(
2439 "/snapshot-diff",
2440 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2441 );
2442
2443 #[cfg(feature = "behavioral-cloning")]
2444 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2445
2446 let router = router
2447 .route("/mockai/learn", post(learn_from_examples))
2448 .route("/mockai/rules/explanations", get(list_rule_explanations))
2449 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2450 .route("/chaos/config", get(get_chaos_config))
2451 .route("/chaos/config", post(update_chaos_config))
2452 .route("/network/profiles", get(list_network_profiles))
2453 .route("/network/profile/apply", post(apply_network_profile));
2454
2455 let router =
2457 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2458
2459 router.with_state(state)
2460}
2461
2462#[cfg(feature = "kafka")]
2463#[derive(Debug, Clone, Serialize, Deserialize)]
2465pub struct KafkaBrokerStats {
2466 pub topics: usize,
2468 pub partitions: usize,
2470 pub consumer_groups: usize,
2472 pub messages_produced: u64,
2474 pub messages_consumed: u64,
2476}
2477
2478#[cfg(feature = "kafka")]
2479#[allow(missing_docs)]
2480#[derive(Debug, Clone, Serialize, Deserialize)]
2481pub struct KafkaTopicInfo {
2482 pub name: String,
2483 pub partitions: usize,
2484 pub replication_factor: i32,
2485}
2486
2487#[cfg(feature = "kafka")]
2488#[allow(missing_docs)]
2489#[derive(Debug, Clone, Serialize, Deserialize)]
2490pub struct KafkaConsumerGroupInfo {
2491 pub group_id: String,
2492 pub members: usize,
2493 pub state: String,
2494}
2495
2496#[cfg(feature = "kafka")]
2497async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2499 if let Some(broker) = &state.kafka_broker {
2500 let topics = broker.topics.read().await;
2501 let consumer_groups = broker.consumer_groups.read().await;
2502
2503 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2504
2505 let metrics_snapshot = broker.metrics().snapshot();
2507
2508 let stats = KafkaBrokerStats {
2509 topics: topics.len(),
2510 partitions: total_partitions,
2511 consumer_groups: consumer_groups.groups().len(),
2512 messages_produced: metrics_snapshot.messages_produced_total,
2513 messages_consumed: metrics_snapshot.messages_consumed_total,
2514 };
2515
2516 Json(stats).into_response()
2517 } else {
2518 (
2519 StatusCode::SERVICE_UNAVAILABLE,
2520 Json(serde_json::json!({
2521 "error": "Kafka broker not available",
2522 "message": "Kafka broker is not enabled or not available."
2523 })),
2524 )
2525 .into_response()
2526 }
2527}
2528
2529#[cfg(feature = "kafka")]
2530async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2532 if let Some(broker) = &state.kafka_broker {
2533 let topics = broker.topics.read().await;
2534 let topic_list: Vec<KafkaTopicInfo> = topics
2535 .iter()
2536 .map(|(name, topic)| KafkaTopicInfo {
2537 name: name.clone(),
2538 partitions: topic.partitions.len(),
2539 replication_factor: topic.config.replication_factor as i32,
2540 })
2541 .collect();
2542
2543 Json(serde_json::json!({
2544 "topics": topic_list
2545 }))
2546 .into_response()
2547 } else {
2548 (
2549 StatusCode::SERVICE_UNAVAILABLE,
2550 Json(serde_json::json!({
2551 "error": "Kafka broker not available",
2552 "message": "Kafka broker is not enabled or not available."
2553 })),
2554 )
2555 .into_response()
2556 }
2557}
2558
2559#[cfg(feature = "kafka")]
2560async fn get_kafka_topic(
2562 State(state): State<ManagementState>,
2563 Path(topic_name): Path<String>,
2564) -> impl IntoResponse {
2565 if let Some(broker) = &state.kafka_broker {
2566 let topics = broker.topics.read().await;
2567 if let Some(topic) = topics.get(&topic_name) {
2568 Json(serde_json::json!({
2569 "name": topic_name,
2570 "partitions": topic.partitions.len(),
2571 "replication_factor": topic.config.replication_factor,
2572 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2573 "id": idx as i32,
2574 "leader": 0,
2575 "replicas": vec![0],
2576 "message_count": partition.messages.len()
2577 })).collect::<Vec<_>>()
2578 })).into_response()
2579 } else {
2580 (
2581 StatusCode::NOT_FOUND,
2582 Json(serde_json::json!({
2583 "error": "Topic not found",
2584 "topic": topic_name
2585 })),
2586 )
2587 .into_response()
2588 }
2589 } else {
2590 (
2591 StatusCode::SERVICE_UNAVAILABLE,
2592 Json(serde_json::json!({
2593 "error": "Kafka broker not available",
2594 "message": "Kafka broker is not enabled or not available."
2595 })),
2596 )
2597 .into_response()
2598 }
2599}
2600
2601#[cfg(feature = "kafka")]
2602async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2604 if let Some(broker) = &state.kafka_broker {
2605 let consumer_groups = broker.consumer_groups.read().await;
2606 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2607 .groups()
2608 .iter()
2609 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2610 group_id: group_id.clone(),
2611 members: group.members.len(),
2612 state: "Stable".to_string(), })
2614 .collect();
2615
2616 Json(serde_json::json!({
2617 "groups": groups
2618 }))
2619 .into_response()
2620 } else {
2621 (
2622 StatusCode::SERVICE_UNAVAILABLE,
2623 Json(serde_json::json!({
2624 "error": "Kafka broker not available",
2625 "message": "Kafka broker is not enabled or not available."
2626 })),
2627 )
2628 .into_response()
2629 }
2630}
2631
2632#[cfg(feature = "kafka")]
2633async fn get_kafka_group(
2635 State(state): State<ManagementState>,
2636 Path(group_id): Path<String>,
2637) -> impl IntoResponse {
2638 if let Some(broker) = &state.kafka_broker {
2639 let consumer_groups = broker.consumer_groups.read().await;
2640 if let Some(group) = consumer_groups.groups().get(&group_id) {
2641 Json(serde_json::json!({
2642 "group_id": group_id,
2643 "members": group.members.len(),
2644 "state": "Stable",
2645 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2646 "member_id": member_id,
2647 "client_id": member.client_id,
2648 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2649 "topic": a.topic,
2650 "partitions": a.partitions
2651 })).collect::<Vec<_>>()
2652 })).collect::<Vec<_>>(),
2653 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2654 "topic": topic,
2655 "partition": partition,
2656 "offset": offset
2657 })).collect::<Vec<_>>()
2658 })).into_response()
2659 } else {
2660 (
2661 StatusCode::NOT_FOUND,
2662 Json(serde_json::json!({
2663 "error": "Consumer group not found",
2664 "group_id": group_id
2665 })),
2666 )
2667 .into_response()
2668 }
2669 } else {
2670 (
2671 StatusCode::SERVICE_UNAVAILABLE,
2672 Json(serde_json::json!({
2673 "error": "Kafka broker not available",
2674 "message": "Kafka broker is not enabled or not available."
2675 })),
2676 )
2677 .into_response()
2678 }
2679}
2680
2681#[cfg(feature = "kafka")]
2684#[derive(Debug, Deserialize)]
2686pub struct KafkaProduceRequest {
2687 pub topic: String,
2689 #[serde(default)]
2691 pub key: Option<String>,
2692 pub value: String,
2694 #[serde(default)]
2696 pub partition: Option<i32>,
2697 #[serde(default)]
2699 pub headers: Option<std::collections::HashMap<String, String>>,
2700}
2701
2702#[cfg(feature = "kafka")]
2703async fn produce_kafka_message(
2705 State(state): State<ManagementState>,
2706 Json(request): Json<KafkaProduceRequest>,
2707) -> impl IntoResponse {
2708 if let Some(broker) = &state.kafka_broker {
2709 let mut topics = broker.topics.write().await;
2710
2711 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2713 mockforge_kafka::topics::Topic::new(
2714 request.topic.clone(),
2715 mockforge_kafka::topics::TopicConfig::default(),
2716 )
2717 });
2718
2719 let partition_id = if let Some(partition) = request.partition {
2721 partition
2722 } else {
2723 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2724 };
2725
2726 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2728 return (
2729 StatusCode::BAD_REQUEST,
2730 Json(serde_json::json!({
2731 "error": "Invalid partition",
2732 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2733 })),
2734 )
2735 .into_response();
2736 }
2737
2738 let key_clone = request.key.clone();
2740 let headers_clone = request.headers.clone();
2741 let message = mockforge_kafka::partitions::KafkaMessage {
2742 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2744 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2745 value: request.value.as_bytes().to_vec(),
2746 headers: headers_clone
2747 .clone()
2748 .unwrap_or_default()
2749 .into_iter()
2750 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2751 .collect(),
2752 };
2753
2754 match topic_entry.produce(partition_id, message).await {
2756 Ok(offset) => {
2757 if let Some(broker) = &state.kafka_broker {
2759 broker.metrics().record_messages_produced(1);
2760 }
2761
2762 #[cfg(feature = "kafka")]
2764 {
2765 let event = MessageEvent::Kafka(KafkaMessageEvent {
2766 topic: request.topic.clone(),
2767 key: key_clone,
2768 value: request.value.clone(),
2769 partition: partition_id,
2770 offset,
2771 headers: headers_clone,
2772 timestamp: chrono::Utc::now().to_rfc3339(),
2773 });
2774 let _ = state.message_events.send(event);
2775 }
2776
2777 Json(serde_json::json!({
2778 "success": true,
2779 "message": format!("Message produced to topic '{}'", request.topic),
2780 "topic": request.topic,
2781 "partition": partition_id,
2782 "offset": offset
2783 }))
2784 .into_response()
2785 }
2786 Err(e) => (
2787 StatusCode::INTERNAL_SERVER_ERROR,
2788 Json(serde_json::json!({
2789 "error": "Failed to produce message",
2790 "message": e.to_string()
2791 })),
2792 )
2793 .into_response(),
2794 }
2795 } else {
2796 (
2797 StatusCode::SERVICE_UNAVAILABLE,
2798 Json(serde_json::json!({
2799 "error": "Kafka broker not available",
2800 "message": "Kafka broker is not enabled or not available."
2801 })),
2802 )
2803 .into_response()
2804 }
2805}
2806
2807#[cfg(feature = "kafka")]
2808#[derive(Debug, Deserialize)]
2810pub struct KafkaBatchProduceRequest {
2811 pub messages: Vec<KafkaProduceRequest>,
2813 #[serde(default = "default_delay")]
2815 pub delay_ms: u64,
2816}
2817
2818#[cfg(feature = "kafka")]
2819async fn produce_kafka_batch(
2821 State(state): State<ManagementState>,
2822 Json(request): Json<KafkaBatchProduceRequest>,
2823) -> impl IntoResponse {
2824 if let Some(broker) = &state.kafka_broker {
2825 if request.messages.is_empty() {
2826 return (
2827 StatusCode::BAD_REQUEST,
2828 Json(serde_json::json!({
2829 "error": "Empty batch",
2830 "message": "At least one message is required"
2831 })),
2832 )
2833 .into_response();
2834 }
2835
2836 let mut results = Vec::new();
2837
2838 for (index, msg_request) in request.messages.iter().enumerate() {
2839 let mut topics = broker.topics.write().await;
2840
2841 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2843 mockforge_kafka::topics::Topic::new(
2844 msg_request.topic.clone(),
2845 mockforge_kafka::topics::TopicConfig::default(),
2846 )
2847 });
2848
2849 let partition_id = if let Some(partition) = msg_request.partition {
2851 partition
2852 } else {
2853 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2854 };
2855
2856 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2858 results.push(serde_json::json!({
2859 "index": index,
2860 "success": false,
2861 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2862 }));
2863 continue;
2864 }
2865
2866 let message = mockforge_kafka::partitions::KafkaMessage {
2868 offset: 0,
2869 timestamp: chrono::Utc::now().timestamp_millis(),
2870 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2871 value: msg_request.value.as_bytes().to_vec(),
2872 headers: msg_request
2873 .headers
2874 .clone()
2875 .unwrap_or_default()
2876 .into_iter()
2877 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2878 .collect(),
2879 };
2880
2881 match topic_entry.produce(partition_id, message).await {
2883 Ok(offset) => {
2884 if let Some(broker) = &state.kafka_broker {
2886 broker.metrics().record_messages_produced(1);
2887 }
2888
2889 let event = MessageEvent::Kafka(KafkaMessageEvent {
2891 topic: msg_request.topic.clone(),
2892 key: msg_request.key.clone(),
2893 value: msg_request.value.clone(),
2894 partition: partition_id,
2895 offset,
2896 headers: msg_request.headers.clone(),
2897 timestamp: chrono::Utc::now().to_rfc3339(),
2898 });
2899 let _ = state.message_events.send(event);
2900
2901 results.push(serde_json::json!({
2902 "index": index,
2903 "success": true,
2904 "topic": msg_request.topic,
2905 "partition": partition_id,
2906 "offset": offset
2907 }));
2908 }
2909 Err(e) => {
2910 results.push(serde_json::json!({
2911 "index": index,
2912 "success": false,
2913 "error": e.to_string()
2914 }));
2915 }
2916 }
2917
2918 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2920 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2921 }
2922 }
2923
2924 let success_count =
2925 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2926
2927 Json(serde_json::json!({
2928 "success": true,
2929 "total": request.messages.len(),
2930 "succeeded": success_count,
2931 "failed": request.messages.len() - success_count,
2932 "results": results
2933 }))
2934 .into_response()
2935 } else {
2936 (
2937 StatusCode::SERVICE_UNAVAILABLE,
2938 Json(serde_json::json!({
2939 "error": "Kafka broker not available",
2940 "message": "Kafka broker is not enabled or not available."
2941 })),
2942 )
2943 .into_response()
2944 }
2945}
2946
2947#[cfg(feature = "mqtt")]
2950async fn mqtt_messages_stream(
2952 State(state): State<ManagementState>,
2953 Query(params): Query<std::collections::HashMap<String, String>>,
2954) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2955 let rx = state.message_events.subscribe();
2956 let topic_filter = params.get("topic").cloned();
2957
2958 let stream = stream::unfold(rx, move |mut rx| {
2959 let topic_filter = topic_filter.clone();
2960
2961 async move {
2962 loop {
2963 match rx.recv().await {
2964 Ok(MessageEvent::Mqtt(event)) => {
2965 if let Some(filter) = &topic_filter {
2967 if !event.topic.contains(filter) {
2968 continue;
2969 }
2970 }
2971
2972 let event_json = serde_json::json!({
2973 "protocol": "mqtt",
2974 "topic": event.topic,
2975 "payload": event.payload,
2976 "qos": event.qos,
2977 "retain": event.retain,
2978 "timestamp": event.timestamp,
2979 });
2980
2981 if let Ok(event_data) = serde_json::to_string(&event_json) {
2982 let sse_event = Event::default().event("mqtt_message").data(event_data);
2983 return Some((Ok(sse_event), rx));
2984 }
2985 }
2986 #[cfg(feature = "kafka")]
2987 Ok(MessageEvent::Kafka(_)) => {
2988 continue;
2990 }
2991 Err(broadcast::error::RecvError::Closed) => {
2992 return None;
2993 }
2994 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2995 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2996 continue;
2997 }
2998 }
2999 }
3000 }
3001 });
3002
3003 Sse::new(stream).keep_alive(
3004 axum::response::sse::KeepAlive::new()
3005 .interval(std::time::Duration::from_secs(15))
3006 .text("keep-alive-text"),
3007 )
3008}
3009
3010#[cfg(feature = "kafka")]
3011async fn kafka_messages_stream(
3013 State(state): State<ManagementState>,
3014 Query(params): Query<std::collections::HashMap<String, String>>,
3015) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3016 let rx = state.message_events.subscribe();
3017 let topic_filter = params.get("topic").cloned();
3018
3019 let stream = stream::unfold(rx, move |mut rx| {
3020 let topic_filter = topic_filter.clone();
3021
3022 async move {
3023 loop {
3024 match rx.recv().await {
3025 #[cfg(feature = "mqtt")]
3026 Ok(MessageEvent::Mqtt(_)) => {
3027 continue;
3029 }
3030 Ok(MessageEvent::Kafka(event)) => {
3031 if let Some(filter) = &topic_filter {
3033 if !event.topic.contains(filter) {
3034 continue;
3035 }
3036 }
3037
3038 let event_json = serde_json::json!({
3039 "protocol": "kafka",
3040 "topic": event.topic,
3041 "key": event.key,
3042 "value": event.value,
3043 "partition": event.partition,
3044 "offset": event.offset,
3045 "headers": event.headers,
3046 "timestamp": event.timestamp,
3047 });
3048
3049 if let Ok(event_data) = serde_json::to_string(&event_json) {
3050 let sse_event =
3051 Event::default().event("kafka_message").data(event_data);
3052 return Some((Ok(sse_event), rx));
3053 }
3054 }
3055 Err(broadcast::error::RecvError::Closed) => {
3056 return None;
3057 }
3058 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3059 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3060 continue;
3061 }
3062 }
3063 }
3064 }
3065 });
3066
3067 Sse::new(stream).keep_alive(
3068 axum::response::sse::KeepAlive::new()
3069 .interval(std::time::Duration::from_secs(15))
3070 .text("keep-alive-text"),
3071 )
3072}
3073
3074#[derive(Debug, Deserialize)]
3078pub struct GenerateSpecRequest {
3079 pub query: String,
3081 pub spec_type: String,
3083 pub api_version: Option<String>,
3085}
3086
3087#[derive(Debug, Deserialize)]
3089pub struct GenerateOpenApiFromTrafficRequest {
3090 #[serde(default)]
3092 pub database_path: Option<String>,
3093 #[serde(default)]
3095 pub since: Option<String>,
3096 #[serde(default)]
3098 pub until: Option<String>,
3099 #[serde(default)]
3101 pub path_pattern: Option<String>,
3102 #[serde(default = "default_min_confidence")]
3104 pub min_confidence: f64,
3105}
3106
3107fn default_min_confidence() -> f64 {
3108 0.7
3109}
3110
3111#[cfg(feature = "data-faker")]
3113async fn generate_ai_spec(
3114 State(_state): State<ManagementState>,
3115 Json(request): Json<GenerateSpecRequest>,
3116) -> impl IntoResponse {
3117 use mockforge_data::rag::{
3118 config::{LlmProvider, RagConfig},
3119 engine::RagEngine,
3120 storage::DocumentStorage,
3121 };
3122 use std::sync::Arc;
3123
3124 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3126 .ok()
3127 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3128
3129 if api_key.is_none() {
3131 return (
3132 StatusCode::SERVICE_UNAVAILABLE,
3133 Json(serde_json::json!({
3134 "error": "AI service not configured",
3135 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3136 })),
3137 )
3138 .into_response();
3139 }
3140
3141 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3143 .unwrap_or_else(|_| "openai".to_string())
3144 .to_lowercase();
3145
3146 let provider = match provider_str.as_str() {
3147 "openai" => LlmProvider::OpenAI,
3148 "anthropic" => LlmProvider::Anthropic,
3149 "ollama" => LlmProvider::Ollama,
3150 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3151 _ => LlmProvider::OpenAI,
3152 };
3153
3154 let api_endpoint =
3155 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3156 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3157 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3158 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3159 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3160 });
3161
3162 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3163 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3164 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3165 LlmProvider::Ollama => "llama2".to_string(),
3166 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3167 });
3168
3169 let rag_config = RagConfig {
3171 provider,
3172 api_endpoint,
3173 api_key,
3174 model,
3175 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3176 .unwrap_or_else(|_| "4096".to_string())
3177 .parse()
3178 .unwrap_or(4096),
3179 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3180 .unwrap_or_else(|_| "0.3".to_string())
3181 .parse()
3182 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3184 .unwrap_or_else(|_| "60".to_string())
3185 .parse()
3186 .unwrap_or(60),
3187 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3188 .unwrap_or_else(|_| "4000".to_string())
3189 .parse()
3190 .unwrap_or(4000),
3191 ..Default::default()
3192 };
3193
3194 let spec_type_label = match request.spec_type.as_str() {
3196 "openapi" => "OpenAPI 3.0",
3197 "graphql" => "GraphQL",
3198 "asyncapi" => "AsyncAPI",
3199 _ => "OpenAPI 3.0",
3200 };
3201
3202 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3203
3204 let prompt = format!(
3205 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3206
3207User Requirements:
3208{}
3209
3210Instructions:
32111. Generate a complete, valid {} specification
32122. Include all paths, operations, request/response schemas, and components
32133. Use realistic field names and data types
32144. Include proper descriptions and examples
32155. Follow {} best practices
32166. Return ONLY the specification, no additional explanation
32177. For OpenAPI, use version {}
3218
3219Return the specification in {} format."#,
3220 spec_type_label,
3221 request.query,
3222 spec_type_label,
3223 spec_type_label,
3224 api_version,
3225 if request.spec_type == "graphql" {
3226 "GraphQL SDL"
3227 } else {
3228 "YAML"
3229 }
3230 );
3231
3232 use mockforge_data::rag::storage::InMemoryStorage;
3237 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3238
3239 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3241 Ok(engine) => engine,
3242 Err(e) => {
3243 return (
3244 StatusCode::INTERNAL_SERVER_ERROR,
3245 Json(serde_json::json!({
3246 "error": "Failed to initialize RAG engine",
3247 "message": e.to_string()
3248 })),
3249 )
3250 .into_response();
3251 }
3252 };
3253
3254 match rag_engine.generate(&prompt, None).await {
3256 Ok(generated_text) => {
3257 let spec = if request.spec_type == "graphql" {
3259 extract_graphql_schema(&generated_text)
3261 } else {
3262 extract_yaml_spec(&generated_text)
3264 };
3265
3266 Json(serde_json::json!({
3267 "success": true,
3268 "spec": spec,
3269 "spec_type": request.spec_type,
3270 }))
3271 .into_response()
3272 }
3273 Err(e) => (
3274 StatusCode::INTERNAL_SERVER_ERROR,
3275 Json(serde_json::json!({
3276 "error": "AI generation failed",
3277 "message": e.to_string()
3278 })),
3279 )
3280 .into_response(),
3281 }
3282}
3283
3284#[cfg(not(feature = "data-faker"))]
3285async fn generate_ai_spec(
3286 State(_state): State<ManagementState>,
3287 Json(_request): Json<GenerateSpecRequest>,
3288) -> impl IntoResponse {
3289 (
3290 StatusCode::NOT_IMPLEMENTED,
3291 Json(serde_json::json!({
3292 "error": "AI features not enabled",
3293 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3294 })),
3295 )
3296 .into_response()
3297}
3298
3299#[cfg(feature = "behavioral-cloning")]
3301async fn generate_openapi_from_traffic(
3302 State(_state): State<ManagementState>,
3303 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3304) -> impl IntoResponse {
3305 use chrono::{DateTime, Utc};
3306 use mockforge_core::intelligent_behavior::{
3307 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3308 IntelligentBehaviorConfig,
3309 };
3310 use mockforge_recorder::{
3311 database::RecorderDatabase,
3312 openapi_export::{QueryFilters, RecordingsToOpenApi},
3313 };
3314 use std::path::PathBuf;
3315
3316 let db_path = if let Some(ref path) = request.database_path {
3318 PathBuf::from(path)
3319 } else {
3320 std::env::current_dir()
3321 .unwrap_or_else(|_| PathBuf::from("."))
3322 .join("recordings.db")
3323 };
3324
3325 let db = match RecorderDatabase::new(&db_path).await {
3327 Ok(db) => db,
3328 Err(e) => {
3329 return (
3330 StatusCode::BAD_REQUEST,
3331 Json(serde_json::json!({
3332 "error": "Database error",
3333 "message": format!("Failed to open recorder database: {}", e)
3334 })),
3335 )
3336 .into_response();
3337 }
3338 };
3339
3340 let since_dt = if let Some(ref since_str) = request.since {
3342 match DateTime::parse_from_rfc3339(since_str) {
3343 Ok(dt) => Some(dt.with_timezone(&Utc)),
3344 Err(e) => {
3345 return (
3346 StatusCode::BAD_REQUEST,
3347 Json(serde_json::json!({
3348 "error": "Invalid date format",
3349 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3350 })),
3351 )
3352 .into_response();
3353 }
3354 }
3355 } else {
3356 None
3357 };
3358
3359 let until_dt = if let Some(ref until_str) = request.until {
3360 match DateTime::parse_from_rfc3339(until_str) {
3361 Ok(dt) => Some(dt.with_timezone(&Utc)),
3362 Err(e) => {
3363 return (
3364 StatusCode::BAD_REQUEST,
3365 Json(serde_json::json!({
3366 "error": "Invalid date format",
3367 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3368 })),
3369 )
3370 .into_response();
3371 }
3372 }
3373 } else {
3374 None
3375 };
3376
3377 let query_filters = QueryFilters {
3379 since: since_dt,
3380 until: until_dt,
3381 path_pattern: request.path_pattern.clone(),
3382 min_status_code: None,
3383 max_requests: Some(1000),
3384 };
3385
3386 let exchanges_from_recorder =
3391 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3392 Ok(exchanges) => exchanges,
3393 Err(e) => {
3394 return (
3395 StatusCode::INTERNAL_SERVER_ERROR,
3396 Json(serde_json::json!({
3397 "error": "Query error",
3398 "message": format!("Failed to query HTTP exchanges: {}", e)
3399 })),
3400 )
3401 .into_response();
3402 }
3403 };
3404
3405 if exchanges_from_recorder.is_empty() {
3406 return (
3407 StatusCode::NOT_FOUND,
3408 Json(serde_json::json!({
3409 "error": "No exchanges found",
3410 "message": "No HTTP exchanges found matching the specified filters"
3411 })),
3412 )
3413 .into_response();
3414 }
3415
3416 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3418 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3419 .into_iter()
3420 .map(|e| LocalHttpExchange {
3421 method: e.method,
3422 path: e.path,
3423 query_params: e.query_params,
3424 headers: e.headers,
3425 body: e.body,
3426 body_encoding: e.body_encoding,
3427 status_code: e.status_code,
3428 response_headers: e.response_headers,
3429 response_body: e.response_body,
3430 response_body_encoding: e.response_body_encoding,
3431 timestamp: e.timestamp,
3432 })
3433 .collect();
3434
3435 let behavior_config = IntelligentBehaviorConfig::default();
3437 let gen_config = OpenApiGenerationConfig {
3438 min_confidence: request.min_confidence,
3439 behavior_model: Some(behavior_config.behavior_model),
3440 };
3441
3442 let generator = OpenApiSpecGenerator::new(gen_config);
3444 let result = match generator.generate_from_exchanges(exchanges).await {
3445 Ok(result) => result,
3446 Err(e) => {
3447 return (
3448 StatusCode::INTERNAL_SERVER_ERROR,
3449 Json(serde_json::json!({
3450 "error": "Generation error",
3451 "message": format!("Failed to generate OpenAPI spec: {}", e)
3452 })),
3453 )
3454 .into_response();
3455 }
3456 };
3457
3458 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3460 raw.clone()
3461 } else {
3462 match serde_json::to_value(&result.spec.spec) {
3463 Ok(json) => json,
3464 Err(e) => {
3465 return (
3466 StatusCode::INTERNAL_SERVER_ERROR,
3467 Json(serde_json::json!({
3468 "error": "Serialization error",
3469 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3470 })),
3471 )
3472 .into_response();
3473 }
3474 }
3475 };
3476
3477 let response = serde_json::json!({
3479 "spec": spec_json,
3480 "metadata": {
3481 "requests_analyzed": result.metadata.requests_analyzed,
3482 "paths_inferred": result.metadata.paths_inferred,
3483 "path_confidence": result.metadata.path_confidence,
3484 "generated_at": result.metadata.generated_at.to_rfc3339(),
3485 "duration_ms": result.metadata.duration_ms,
3486 }
3487 });
3488
3489 Json(response).into_response()
3490}
3491
3492async fn list_rule_explanations(
3494 State(state): State<ManagementState>,
3495 Query(params): Query<std::collections::HashMap<String, String>>,
3496) -> impl IntoResponse {
3497 use mockforge_core::intelligent_behavior::RuleType;
3498
3499 let explanations = state.rule_explanations.read().await;
3500 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3501
3502 if let Some(rule_type_str) = params.get("rule_type") {
3504 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3505 explanations_vec.retain(|e| e.rule_type == rule_type);
3506 }
3507 }
3508
3509 if let Some(min_confidence_str) = params.get("min_confidence") {
3511 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3512 explanations_vec.retain(|e| e.confidence >= min_confidence);
3513 }
3514 }
3515
3516 explanations_vec.sort_by(|a, b| {
3518 b.confidence
3519 .partial_cmp(&a.confidence)
3520 .unwrap_or(std::cmp::Ordering::Equal)
3521 .then_with(|| b.generated_at.cmp(&a.generated_at))
3522 });
3523
3524 Json(serde_json::json!({
3525 "explanations": explanations_vec,
3526 "total": explanations_vec.len(),
3527 }))
3528 .into_response()
3529}
3530
3531async fn get_rule_explanation(
3533 State(state): State<ManagementState>,
3534 Path(rule_id): Path<String>,
3535) -> impl IntoResponse {
3536 let explanations = state.rule_explanations.read().await;
3537
3538 match explanations.get(&rule_id) {
3539 Some(explanation) => Json(serde_json::json!({
3540 "explanation": explanation,
3541 }))
3542 .into_response(),
3543 None => (
3544 StatusCode::NOT_FOUND,
3545 Json(serde_json::json!({
3546 "error": "Rule explanation not found",
3547 "message": format!("No explanation found for rule ID: {}", rule_id)
3548 })),
3549 )
3550 .into_response(),
3551 }
3552}
3553
3554#[derive(Debug, Deserialize)]
3556pub struct LearnFromExamplesRequest {
3557 pub examples: Vec<ExamplePairRequest>,
3559 #[serde(default)]
3561 pub config: Option<serde_json::Value>,
3562}
3563
3564#[derive(Debug, Deserialize)]
3566pub struct ExamplePairRequest {
3567 pub request: serde_json::Value,
3569 pub response: serde_json::Value,
3571}
3572
3573async fn learn_from_examples(
3578 State(state): State<ManagementState>,
3579 Json(request): Json<LearnFromExamplesRequest>,
3580) -> impl IntoResponse {
3581 use mockforge_core::intelligent_behavior::{
3582 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3583 rule_generator::{ExamplePair, RuleGenerator},
3584 };
3585
3586 if request.examples.is_empty() {
3587 return (
3588 StatusCode::BAD_REQUEST,
3589 Json(serde_json::json!({
3590 "error": "No examples provided",
3591 "message": "At least one example pair is required"
3592 })),
3593 )
3594 .into_response();
3595 }
3596
3597 let example_pairs: Result<Vec<ExamplePair>, String> = request
3599 .examples
3600 .into_iter()
3601 .enumerate()
3602 .map(|(idx, ex)| {
3603 let method = ex
3605 .request
3606 .get("method")
3607 .and_then(|v| v.as_str())
3608 .map(|s| s.to_string())
3609 .unwrap_or_else(|| "GET".to_string());
3610 let path = ex
3611 .request
3612 .get("path")
3613 .and_then(|v| v.as_str())
3614 .map(|s| s.to_string())
3615 .unwrap_or_else(|| "/".to_string());
3616 let request_body = ex.request.get("body").cloned();
3617 let query_params = ex
3618 .request
3619 .get("query_params")
3620 .and_then(|v| v.as_object())
3621 .map(|obj| {
3622 obj.iter()
3623 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3624 .collect()
3625 })
3626 .unwrap_or_default();
3627 let headers = ex
3628 .request
3629 .get("headers")
3630 .and_then(|v| v.as_object())
3631 .map(|obj| {
3632 obj.iter()
3633 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3634 .collect()
3635 })
3636 .unwrap_or_default();
3637
3638 let status = ex
3640 .response
3641 .get("status_code")
3642 .or_else(|| ex.response.get("status"))
3643 .and_then(|v| v.as_u64())
3644 .map(|n| n as u16)
3645 .unwrap_or(200);
3646 let response_body = ex.response.get("body").cloned();
3647
3648 Ok(ExamplePair {
3649 method,
3650 path,
3651 request: request_body,
3652 status,
3653 response: response_body,
3654 query_params,
3655 headers,
3656 metadata: {
3657 let mut meta = std::collections::HashMap::new();
3658 meta.insert("source".to_string(), "api".to_string());
3659 meta.insert("example_index".to_string(), idx.to_string());
3660 meta
3661 },
3662 })
3663 })
3664 .collect();
3665
3666 let example_pairs = match example_pairs {
3667 Ok(pairs) => pairs,
3668 Err(e) => {
3669 return (
3670 StatusCode::BAD_REQUEST,
3671 Json(serde_json::json!({
3672 "error": "Invalid examples",
3673 "message": e
3674 })),
3675 )
3676 .into_response();
3677 }
3678 };
3679
3680 let behavior_config = if let Some(config_json) = request.config {
3682 serde_json::from_value(config_json)
3684 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3685 .behavior_model
3686 } else {
3687 BehaviorModelConfig::default()
3688 };
3689
3690 let generator = RuleGenerator::new(behavior_config);
3692
3693 let (rules, explanations) =
3695 match generator.generate_rules_with_explanations(example_pairs).await {
3696 Ok(result) => result,
3697 Err(e) => {
3698 return (
3699 StatusCode::INTERNAL_SERVER_ERROR,
3700 Json(serde_json::json!({
3701 "error": "Rule generation failed",
3702 "message": format!("Failed to generate rules: {}", e)
3703 })),
3704 )
3705 .into_response();
3706 }
3707 };
3708
3709 {
3711 let mut stored_explanations = state.rule_explanations.write().await;
3712 for explanation in &explanations {
3713 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3714 }
3715 }
3716
3717 let response = serde_json::json!({
3719 "success": true,
3720 "rules_generated": {
3721 "consistency_rules": rules.consistency_rules.len(),
3722 "schemas": rules.schemas.len(),
3723 "state_machines": rules.state_transitions.len(),
3724 "system_prompt": !rules.system_prompt.is_empty(),
3725 },
3726 "explanations": explanations.iter().map(|e| serde_json::json!({
3727 "rule_id": e.rule_id,
3728 "rule_type": e.rule_type,
3729 "confidence": e.confidence,
3730 "reasoning": e.reasoning,
3731 })).collect::<Vec<_>>(),
3732 "total_explanations": explanations.len(),
3733 });
3734
3735 Json(response).into_response()
3736}
3737
3738#[cfg(feature = "data-faker")]
3739fn extract_yaml_spec(text: &str) -> String {
3740 if let Some(start) = text.find("```yaml") {
3742 let yaml_start = text[start + 7..].trim_start();
3743 if let Some(end) = yaml_start.find("```") {
3744 return yaml_start[..end].trim().to_string();
3745 }
3746 }
3747 if let Some(start) = text.find("```") {
3748 let content_start = text[start + 3..].trim_start();
3749 if let Some(end) = content_start.find("```") {
3750 return content_start[..end].trim().to_string();
3751 }
3752 }
3753
3754 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3756 return text.trim().to_string();
3757 }
3758
3759 text.trim().to_string()
3761}
3762
3763#[cfg(feature = "data-faker")]
3765fn extract_graphql_schema(text: &str) -> String {
3766 if let Some(start) = text.find("```graphql") {
3768 let schema_start = text[start + 10..].trim_start();
3769 if let Some(end) = schema_start.find("```") {
3770 return schema_start[..end].trim().to_string();
3771 }
3772 }
3773 if let Some(start) = text.find("```") {
3774 let content_start = text[start + 3..].trim_start();
3775 if let Some(end) = content_start.find("```") {
3776 return content_start[..end].trim().to_string();
3777 }
3778 }
3779
3780 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3782 return text.trim().to_string();
3783 }
3784
3785 text.trim().to_string()
3786}
3787
3788async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3792 #[cfg(feature = "chaos")]
3793 {
3794 if let Some(chaos_state) = &_state.chaos_api_state {
3795 let config = chaos_state.config.read().await;
3796 Json(serde_json::json!({
3798 "enabled": config.enabled,
3799 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3800 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3801 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3802 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3803 }))
3804 .into_response()
3805 } else {
3806 Json(serde_json::json!({
3808 "enabled": false,
3809 "latency": null,
3810 "fault_injection": null,
3811 "rate_limit": null,
3812 "traffic_shaping": null,
3813 }))
3814 .into_response()
3815 }
3816 }
3817 #[cfg(not(feature = "chaos"))]
3818 {
3819 Json(serde_json::json!({
3821 "enabled": false,
3822 "latency": null,
3823 "fault_injection": null,
3824 "rate_limit": null,
3825 "traffic_shaping": null,
3826 }))
3827 .into_response()
3828 }
3829}
3830
3831#[derive(Debug, Deserialize)]
3833pub struct ChaosConfigUpdate {
3834 pub enabled: Option<bool>,
3836 pub latency: Option<serde_json::Value>,
3838 pub fault_injection: Option<serde_json::Value>,
3840 pub rate_limit: Option<serde_json::Value>,
3842 pub traffic_shaping: Option<serde_json::Value>,
3844}
3845
3846async fn update_chaos_config(
3848 State(_state): State<ManagementState>,
3849 Json(_config_update): Json<ChaosConfigUpdate>,
3850) -> impl IntoResponse {
3851 #[cfg(feature = "chaos")]
3852 {
3853 if let Some(chaos_state) = &_state.chaos_api_state {
3854 use mockforge_chaos::config::{
3855 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3856 };
3857
3858 let mut config = chaos_state.config.write().await;
3859
3860 if let Some(enabled) = _config_update.enabled {
3862 config.enabled = enabled;
3863 }
3864
3865 if let Some(latency_json) = _config_update.latency {
3867 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3868 config.latency = Some(latency);
3869 }
3870 }
3871
3872 if let Some(fault_json) = _config_update.fault_injection {
3874 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3875 config.fault_injection = Some(fault);
3876 }
3877 }
3878
3879 if let Some(rate_json) = _config_update.rate_limit {
3881 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3882 config.rate_limit = Some(rate);
3883 }
3884 }
3885
3886 if let Some(traffic_json) = _config_update.traffic_shaping {
3888 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3889 config.traffic_shaping = Some(traffic);
3890 }
3891 }
3892
3893 drop(config);
3896
3897 info!("Chaos configuration updated successfully");
3898 Json(serde_json::json!({
3899 "success": true,
3900 "message": "Chaos configuration updated and applied"
3901 }))
3902 .into_response()
3903 } else {
3904 (
3905 StatusCode::SERVICE_UNAVAILABLE,
3906 Json(serde_json::json!({
3907 "success": false,
3908 "error": "Chaos API not available",
3909 "message": "Chaos engineering is not enabled or configured"
3910 })),
3911 )
3912 .into_response()
3913 }
3914 }
3915 #[cfg(not(feature = "chaos"))]
3916 {
3917 (
3918 StatusCode::NOT_IMPLEMENTED,
3919 Json(serde_json::json!({
3920 "success": false,
3921 "error": "Chaos feature not enabled",
3922 "message": "Chaos engineering feature is not compiled into this build"
3923 })),
3924 )
3925 .into_response()
3926 }
3927}
3928
3929async fn list_network_profiles() -> impl IntoResponse {
3933 use mockforge_core::network_profiles::NetworkProfileCatalog;
3934
3935 let catalog = NetworkProfileCatalog::default();
3936 let profiles: Vec<serde_json::Value> = catalog
3937 .list_profiles_with_description()
3938 .iter()
3939 .map(|(name, description)| {
3940 serde_json::json!({
3941 "name": name,
3942 "description": description,
3943 })
3944 })
3945 .collect();
3946
3947 Json(serde_json::json!({
3948 "profiles": profiles
3949 }))
3950 .into_response()
3951}
3952
3953#[derive(Debug, Deserialize)]
3954pub struct ApplyNetworkProfileRequest {
3956 pub profile_name: String,
3958}
3959
3960async fn apply_network_profile(
3962 State(state): State<ManagementState>,
3963 Json(request): Json<ApplyNetworkProfileRequest>,
3964) -> impl IntoResponse {
3965 use mockforge_core::network_profiles::NetworkProfileCatalog;
3966
3967 let catalog = NetworkProfileCatalog::default();
3968 if let Some(profile) = catalog.get(&request.profile_name) {
3969 if let Some(server_config) = &state.server_config {
3972 let mut config = server_config.write().await;
3973
3974 use mockforge_core::config::NetworkShapingConfig;
3976
3977 let network_shaping = NetworkShapingConfig {
3981 enabled: profile.traffic_shaping.bandwidth.enabled
3982 || profile.traffic_shaping.burst_loss.enabled,
3983 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3985 max_connections: 1000, };
3987
3988 if let Some(ref mut chaos) = config.observability.chaos {
3991 chaos.traffic_shaping = Some(network_shaping);
3992 } else {
3993 use mockforge_core::config::ChaosEngConfig;
3995 config.observability.chaos = Some(ChaosEngConfig {
3996 enabled: true,
3997 latency: None,
3998 fault_injection: None,
3999 rate_limit: None,
4000 traffic_shaping: Some(network_shaping),
4001 scenario: None,
4002 });
4003 }
4004
4005 info!("Network profile '{}' applied to server configuration", request.profile_name);
4006 } else {
4007 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4008 }
4009
4010 #[cfg(feature = "chaos")]
4012 {
4013 if let Some(chaos_state) = &state.chaos_api_state {
4014 use mockforge_chaos::config::TrafficShapingConfig;
4015
4016 let mut chaos_config = chaos_state.config.write().await;
4017 let chaos_traffic_shaping = TrafficShapingConfig {
4019 enabled: profile.traffic_shaping.bandwidth.enabled
4020 || profile.traffic_shaping.burst_loss.enabled,
4021 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4023 max_connections: 0,
4024 connection_timeout_ms: 30000,
4025 };
4026 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4027 chaos_config.enabled = true; drop(chaos_config);
4029 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4030 }
4031 }
4032
4033 Json(serde_json::json!({
4034 "success": true,
4035 "message": format!("Network profile '{}' applied", request.profile_name),
4036 "profile": {
4037 "name": profile.name,
4038 "description": profile.description,
4039 }
4040 }))
4041 .into_response()
4042 } else {
4043 (
4044 StatusCode::NOT_FOUND,
4045 Json(serde_json::json!({
4046 "error": "Profile not found",
4047 "message": format!("Network profile '{}' not found", request.profile_name)
4048 })),
4049 )
4050 .into_response()
4051 }
4052}
4053
4054pub fn management_router_with_ui_builder(
4056 state: ManagementState,
4057 server_config: mockforge_core::config::ServerConfig,
4058) -> Router {
4059 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4060
4061 let management = management_router(state);
4063
4064 let ui_builder_state = UIBuilderState::new(server_config);
4066 let ui_builder = create_ui_builder_router(ui_builder_state);
4067
4068 management.nest("/ui-builder", ui_builder)
4070}
4071
4072pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4074 use crate::spec_import::{spec_import_router, SpecImportState};
4075
4076 let management = management_router(state);
4078
4079 Router::new()
4081 .merge(management)
4082 .merge(spec_import_router(SpecImportState::new()))
4083}
4084
4085#[cfg(test)]
4086mod tests {
4087 use super::*;
4088
4089 #[tokio::test]
4090 async fn test_create_and_get_mock() {
4091 let state = ManagementState::new(None, None, 3000);
4092
4093 let mock = MockConfig {
4094 id: "test-1".to_string(),
4095 name: "Test Mock".to_string(),
4096 method: "GET".to_string(),
4097 path: "/test".to_string(),
4098 response: MockResponse {
4099 body: serde_json::json!({"message": "test"}),
4100 headers: None,
4101 },
4102 enabled: true,
4103 latency_ms: None,
4104 status_code: Some(200),
4105 request_match: None,
4106 priority: None,
4107 scenario: None,
4108 required_scenario_state: None,
4109 new_scenario_state: None,
4110 };
4111
4112 {
4114 let mut mocks = state.mocks.write().await;
4115 mocks.push(mock.clone());
4116 }
4117
4118 let mocks = state.mocks.read().await;
4120 let found = mocks.iter().find(|m| m.id == "test-1");
4121 assert!(found.is_some());
4122 assert_eq!(found.unwrap().name, "Test Mock");
4123 }
4124
4125 #[tokio::test]
4126 async fn test_server_stats() {
4127 let state = ManagementState::new(None, None, 3000);
4128
4129 {
4131 let mut mocks = state.mocks.write().await;
4132 mocks.push(MockConfig {
4133 id: "1".to_string(),
4134 name: "Mock 1".to_string(),
4135 method: "GET".to_string(),
4136 path: "/test1".to_string(),
4137 response: MockResponse {
4138 body: serde_json::json!({}),
4139 headers: None,
4140 },
4141 enabled: true,
4142 latency_ms: None,
4143 status_code: Some(200),
4144 request_match: None,
4145 priority: None,
4146 scenario: None,
4147 required_scenario_state: None,
4148 new_scenario_state: None,
4149 });
4150 mocks.push(MockConfig {
4151 id: "2".to_string(),
4152 name: "Mock 2".to_string(),
4153 method: "POST".to_string(),
4154 path: "/test2".to_string(),
4155 response: MockResponse {
4156 body: serde_json::json!({}),
4157 headers: None,
4158 },
4159 enabled: false,
4160 latency_ms: None,
4161 status_code: Some(201),
4162 request_match: None,
4163 priority: None,
4164 scenario: None,
4165 required_scenario_state: None,
4166 new_scenario_state: None,
4167 });
4168 }
4169
4170 let mocks = state.mocks.read().await;
4171 assert_eq!(mocks.len(), 2);
4172 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4173 }
4174
4175 #[test]
4176 fn test_mock_matches_request_with_xpath_absolute_path() {
4177 let mock = MockConfig {
4178 id: "xpath-1".to_string(),
4179 name: "XPath Match".to_string(),
4180 method: "POST".to_string(),
4181 path: "/xml".to_string(),
4182 response: MockResponse {
4183 body: serde_json::json!({"ok": true}),
4184 headers: None,
4185 },
4186 enabled: true,
4187 latency_ms: None,
4188 status_code: Some(200),
4189 request_match: Some(RequestMatchCriteria {
4190 xpath: Some("/root/order/id".to_string()),
4191 ..Default::default()
4192 }),
4193 priority: None,
4194 scenario: None,
4195 required_scenario_state: None,
4196 new_scenario_state: None,
4197 };
4198
4199 let body = br#"<root><order><id>123</id></order></root>"#;
4200 let headers = std::collections::HashMap::new();
4201 let query = std::collections::HashMap::new();
4202
4203 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4204 }
4205
4206 #[test]
4207 fn test_mock_matches_request_with_xpath_text_predicate() {
4208 let mock = MockConfig {
4209 id: "xpath-2".to_string(),
4210 name: "XPath Predicate Match".to_string(),
4211 method: "POST".to_string(),
4212 path: "/xml".to_string(),
4213 response: MockResponse {
4214 body: serde_json::json!({"ok": true}),
4215 headers: None,
4216 },
4217 enabled: true,
4218 latency_ms: None,
4219 status_code: Some(200),
4220 request_match: Some(RequestMatchCriteria {
4221 xpath: Some("//order/id[text()='123']".to_string()),
4222 ..Default::default()
4223 }),
4224 priority: None,
4225 scenario: None,
4226 required_scenario_state: None,
4227 new_scenario_state: None,
4228 };
4229
4230 let body = br#"<root><order><id>123</id></order></root>"#;
4231 let headers = std::collections::HashMap::new();
4232 let query = std::collections::HashMap::new();
4233
4234 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4235 }
4236
4237 #[test]
4238 fn test_mock_matches_request_with_xpath_no_match() {
4239 let mock = MockConfig {
4240 id: "xpath-3".to_string(),
4241 name: "XPath No Match".to_string(),
4242 method: "POST".to_string(),
4243 path: "/xml".to_string(),
4244 response: MockResponse {
4245 body: serde_json::json!({"ok": true}),
4246 headers: None,
4247 },
4248 enabled: true,
4249 latency_ms: None,
4250 status_code: Some(200),
4251 request_match: Some(RequestMatchCriteria {
4252 xpath: Some("//order/id[text()='456']".to_string()),
4253 ..Default::default()
4254 }),
4255 priority: None,
4256 scenario: None,
4257 required_scenario_state: None,
4258 new_scenario_state: None,
4259 };
4260
4261 let body = br#"<root><order><id>123</id></order></root>"#;
4262 let headers = std::collections::HashMap::new();
4263 let query = std::collections::HashMap::new();
4264
4265 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4266 }
4267}