1use axum::{
7 extract::State,
8 http::StatusCode,
9 response::{
10 sse::{Event, Sse},
11 IntoResponse, Json,
12 },
13};
14use futures_util::stream::{self, Stream};
15use mockforge_core::graph::{GraphBuilder, GraphData};
16use mockforge_core::request_chaining::ChainDefinition;
17use serde_json::Value;
18use std::convert::Infallible;
19use std::time::Duration;
20
21use super::AdminState;
22use crate::models::ApiResponse;
23
24pub async fn get_graph(State(state): State<AdminState>) -> impl IntoResponse {
32 let mut builder = GraphBuilder::new();
33
34 if let Some(http_addr) = state.http_server_addr {
36 match fetch_chains_from_server(http_addr).await {
37 Ok(chains) => {
38 builder.from_chains(&chains);
39 }
40 Err(e) => {
41 tracing::warn!("Failed to fetch chains for graph: {}", e);
42 }
44 }
45 }
46
47 if let Some(http_addr) = state.http_server_addr {
49 match fetch_endpoints_from_ui_builder(http_addr).await {
50 Ok(endpoints) => {
51 for endpoint in endpoints {
53 let protocol_str = match endpoint.protocol {
54 mockforge_http::ui_builder::Protocol::Http => "http",
55 mockforge_http::ui_builder::Protocol::Grpc => "grpc",
56 mockforge_http::ui_builder::Protocol::Websocket => "websocket",
57 mockforge_http::ui_builder::Protocol::Graphql => "graphql",
58 mockforge_http::ui_builder::Protocol::Mqtt => "mqtt",
59 mockforge_http::ui_builder::Protocol::Smtp => "smtp",
60 mockforge_http::ui_builder::Protocol::Kafka => "kafka",
61 mockforge_http::ui_builder::Protocol::Amqp => "amqp",
62 mockforge_http::ui_builder::Protocol::Ftp => "ftp",
63 };
64
65 let mut metadata = std::collections::HashMap::new();
66 metadata.insert("enabled".to_string(), Value::Bool(endpoint.enabled));
67 if let Some(desc) = endpoint.description {
68 metadata.insert("description".to_string(), Value::String(desc));
69 }
70
71 if let mockforge_http::ui_builder::EndpointProtocolConfig::Http(http_config) =
73 &endpoint.config
74 {
75 metadata.insert(
76 "method".to_string(),
77 Value::String(http_config.method.clone()),
78 );
79 metadata
80 .insert("path".to_string(), Value::String(http_config.path.clone()));
81 }
82
83 let protocol = match protocol_str {
84 "http" => mockforge_core::graph::Protocol::Http,
85 "grpc" => mockforge_core::graph::Protocol::Grpc,
86 "websocket" => mockforge_core::graph::Protocol::Websocket,
87 "graphql" => mockforge_core::graph::Protocol::Graphql,
88 "mqtt" => mockforge_core::graph::Protocol::Mqtt,
89 "smtp" => mockforge_core::graph::Protocol::Smtp,
90 "kafka" => mockforge_core::graph::Protocol::Kafka,
91 "amqp" => mockforge_core::graph::Protocol::Amqp,
92 "ftp" => mockforge_core::graph::Protocol::Ftp,
93 _ => mockforge_core::graph::Protocol::Http,
94 };
95
96 builder.add_endpoint(endpoint.id, endpoint.name, protocol, metadata);
97 }
98 }
99 Err(e) => {
100 tracing::debug!("UI Builder endpoints not available: {}", e);
101 }
103 }
104 }
105
106 let graph_data = builder.build();
108
109 Json(ApiResponse::success(graph_data))
110}
111
112async fn fetch_endpoints_from_ui_builder(
114 http_addr: std::net::SocketAddr,
115) -> Result<Vec<mockforge_http::ui_builder::EndpointConfig>, String> {
116 let url = format!("http://{}/__mockforge/ui-builder/endpoints", http_addr);
117 let client = reqwest::Client::new();
118
119 let response = client
120 .get(&url)
121 .send()
122 .await
123 .map_err(|e| format!("Failed to fetch endpoints: {}", e))?;
124
125 if !response.status().is_success() {
126 return Err(format!("HTTP error: {}", response.status()));
127 }
128
129 let json: Value =
130 response.json().await.map_err(|e| format!("Failed to parse response: {}", e))?;
131
132 let endpoints_array = json
135 .get("endpoints")
136 .or_else(|| json.get("data").and_then(|d| d.get("endpoints")))
137 .and_then(|v| v.as_array())
138 .ok_or_else(|| "Invalid response format: endpoints array not found".to_string())?;
139
140 let mut endpoints = Vec::new();
141 for endpoint_value in endpoints_array {
142 if let Ok(endpoint) = serde_json::from_value::<mockforge_http::ui_builder::EndpointConfig>(
143 endpoint_value.clone(),
144 ) {
145 endpoints.push(endpoint);
146 }
147 }
148
149 Ok(endpoints)
150}
151
152pub async fn graph_sse(
154 State(state): State<AdminState>,
155) -> Sse<impl Stream<Item = std::result::Result<Event, Infallible>>> {
156 tracing::info!("SSE endpoint /graph/sse accessed - starting real-time graph updates");
157
158 let http_addr = state.http_server_addr;
160
161 let stream = stream::unfold((), move |_| {
162 let http_addr = http_addr.clone();
163 async move {
164 tokio::time::sleep(Duration::from_secs(5)).await; let mut builder = GraphBuilder::new();
168
169 if let Some(addr) = http_addr {
171 if let Ok(chains) = fetch_chains_from_server(addr).await {
172 builder.from_chains(&chains);
173 }
174
175 if let Ok(endpoints) = fetch_endpoints_from_ui_builder(addr).await {
177 for endpoint in endpoints {
178 let protocol_str = match endpoint.protocol {
179 mockforge_http::ui_builder::Protocol::Http => "http",
180 mockforge_http::ui_builder::Protocol::Grpc => "grpc",
181 mockforge_http::ui_builder::Protocol::Websocket => "websocket",
182 mockforge_http::ui_builder::Protocol::Graphql => "graphql",
183 mockforge_http::ui_builder::Protocol::Mqtt => "mqtt",
184 mockforge_http::ui_builder::Protocol::Smtp => "smtp",
185 mockforge_http::ui_builder::Protocol::Kafka => "kafka",
186 mockforge_http::ui_builder::Protocol::Amqp => "amqp",
187 mockforge_http::ui_builder::Protocol::Ftp => "ftp",
188 };
189
190 let mut metadata = std::collections::HashMap::new();
191 metadata.insert("enabled".to_string(), Value::Bool(endpoint.enabled));
192 if let Some(desc) = endpoint.description {
193 metadata.insert("description".to_string(), Value::String(desc));
194 }
195
196 if let mockforge_http::ui_builder::EndpointProtocolConfig::Http(
197 http_config,
198 ) = &endpoint.config
199 {
200 metadata.insert(
201 "method".to_string(),
202 Value::String(http_config.method.clone()),
203 );
204 metadata.insert(
205 "path".to_string(),
206 Value::String(http_config.path.clone()),
207 );
208 }
209
210 let protocol = match protocol_str {
211 "http" => mockforge_core::graph::Protocol::Http,
212 "grpc" => mockforge_core::graph::Protocol::Grpc,
213 "websocket" => mockforge_core::graph::Protocol::Websocket,
214 "graphql" => mockforge_core::graph::Protocol::Graphql,
215 "mqtt" => mockforge_core::graph::Protocol::Mqtt,
216 "smtp" => mockforge_core::graph::Protocol::Smtp,
217 "kafka" => mockforge_core::graph::Protocol::Kafka,
218 "amqp" => mockforge_core::graph::Protocol::Amqp,
219 "ftp" => mockforge_core::graph::Protocol::Ftp,
220 _ => mockforge_core::graph::Protocol::Http,
221 };
222
223 builder.add_endpoint(endpoint.id, endpoint.name, protocol, metadata);
224 }
225 }
226 }
227
228 let graph_data = builder.build();
229 let json_data = serde_json::to_string(&graph_data).unwrap_or_default();
230
231 Some((Ok(Event::default().data(json_data)), ()))
232 }
233 });
234
235 Sse::new(stream).keep_alive(
236 axum::response::sse::KeepAlive::new()
237 .interval(Duration::from_secs(15))
238 .text("keep-alive-text"),
239 )
240}
241
242async fn fetch_chains_from_server(
244 http_addr: std::net::SocketAddr,
245) -> Result<Vec<ChainDefinition>, String> {
246 let url = format!("http://{}/__mockforge/chains", http_addr);
247 let client = reqwest::Client::new();
248
249 let response = client
250 .get(&url)
251 .send()
252 .await
253 .map_err(|e| format!("Failed to fetch chains: {}", e))?;
254
255 if !response.status().is_success() {
256 return Err(format!("HTTP error: {}", response.status()));
257 }
258
259 let json: Value =
260 response.json().await.map_err(|e| format!("Failed to parse response: {}", e))?;
261
262 let chains_array = json
266 .get("chains")
267 .or_else(|| json.get("data").and_then(|d| d.get("chains")))
268 .and_then(|v| v.as_array())
269 .ok_or_else(|| "Invalid response format: chains array not found".to_string())?;
270
271 let mut chains = Vec::new();
272 for chain_value in chains_array {
273 if let Some(chain_id) = chain_value.get("id").and_then(|v| v.as_str()) {
276 match fetch_chain_details(http_addr, chain_id).await {
277 Ok(Some(chain)) => chains.push(chain),
278 Ok(None) => {
279 tracing::warn!("Chain {} not found, skipping", chain_id);
281 }
282 Err(e) => {
283 tracing::warn!("Failed to fetch chain {}: {}", chain_id, e);
284 if let Ok(chain) =
286 serde_json::from_value::<ChainDefinition>(chain_value.clone())
287 {
288 chains.push(chain);
289 }
290 }
291 }
292 }
293 }
294
295 Ok(chains)
296}
297
298async fn fetch_chain_details(
300 http_addr: std::net::SocketAddr,
301 chain_id: &str,
302) -> Result<Option<ChainDefinition>, String> {
303 let url = format!("http://{}/__mockforge/chains/{}", http_addr, chain_id);
304 let client = reqwest::Client::new();
305
306 let response = client
307 .get(&url)
308 .send()
309 .await
310 .map_err(|e| format!("Failed to fetch chain details: {}", e))?;
311
312 if response.status() == StatusCode::NOT_FOUND {
313 return Ok(None);
314 }
315
316 if !response.status().is_success() {
317 return Err(format!("HTTP error: {}", response.status()));
318 }
319
320 let json: Value =
321 response.json().await.map_err(|e| format!("Failed to parse response: {}", e))?;
322
323 let chain_value = json.get("chain").or_else(|| json.get("data")).unwrap_or(&json);
326
327 serde_json::from_value::<ChainDefinition>(chain_value.clone())
328 .map(Some)
329 .map_err(|e| format!("Failed to deserialize chain: {}", e))
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use std::net::SocketAddr;
336
337 #[test]
338 fn test_graph_builder_creation() {
339 let builder = GraphBuilder::new();
340 let graph = builder.build();
341 assert_eq!(graph.nodes.len(), 0);
342 assert_eq!(graph.edges.len(), 0);
343 assert_eq!(graph.clusters.len(), 0);
344 }
345}