jsonrpc_debugger/
proxy.rs

1use crate::app::{
2    AppMode, JsonRpcMessage, MessageDirection, PendingRequest, ProxyDecision, TransportType,
3};
4use anyhow::Result;
5use reqwest::Client;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use uuid::Uuid;
10
11use tokio::sync::{mpsc, oneshot};
12use warp::Filter;
13
14// Shared state between app and proxy
15#[derive(Clone)]
16pub struct ProxyState {
17    pub app_mode: Arc<Mutex<AppMode>>,
18    pub pending_sender: mpsc::UnboundedSender<PendingRequest>,
19}
20
21pub struct ProxyServer {
22    listen_port: u16,
23    target_url: String,
24    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
25    client: Client,
26    proxy_state: Option<ProxyState>,
27}
28
29impl ProxyServer {
30    pub fn new(
31        listen_port: u16,
32        target_url: String,
33        message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
34    ) -> Self {
35        Self {
36            listen_port,
37            target_url,
38            message_sender,
39            client: Client::new(),
40            proxy_state: None,
41        }
42    }
43
44    pub fn with_state(mut self, proxy_state: ProxyState) -> Self {
45        self.proxy_state = Some(proxy_state);
46        self
47    }
48
49    pub async fn start(&self) -> Result<()> {
50        let target_url = self.target_url.clone();
51        let client = self.client.clone();
52        let message_sender = self.message_sender.clone();
53        let proxy_state = self.proxy_state.clone();
54
55        let proxy_route = warp::path::full()
56            .and(warp::post())
57            .and(warp::header::headers_cloned())
58            .and(warp::body::json())
59            .and_then(
60                move |path: warp::path::FullPath, headers: warp::http::HeaderMap, body: Value| {
61                    let target_url = target_url.clone();
62                    let client = client.clone();
63                    let message_sender = message_sender.clone();
64                    let proxy_state = proxy_state.clone();
65
66                    async move {
67                        handle_proxy_request(
68                            path,
69                            headers,
70                            body,
71                            target_url,
72                            client,
73                            message_sender,
74                            proxy_state,
75                        )
76                        .await
77                    }
78                },
79            );
80
81        let cors = warp::cors()
82            .allow_any_origin()
83            .allow_headers(vec!["content-type", "authorization"])
84            .allow_methods(vec!["POST", "OPTIONS"]);
85
86        let routes = proxy_route.with(cors);
87
88        // Use a simpler approach - just run the server
89        // The task abort from main.rs will handle shutdown
90        warp::serve(routes)
91            .run(([127, 0, 0, 1], self.listen_port))
92            .await;
93
94        Ok(())
95    }
96}
97
98async fn handle_proxy_request(
99    path: warp::path::FullPath,
100    headers: warp::http::HeaderMap,
101    body: Value,
102    target_url: String,
103    client: Client,
104    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
105    proxy_state: Option<ProxyState>,
106) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
107    // Convert headers to HashMap
108    let mut header_map = HashMap::new();
109    for (name, value) in headers.iter() {
110        if let Ok(value_str) = value.to_str() {
111            header_map.insert(name.to_string(), value_str.to_string());
112        }
113    }
114
115    // Log the incoming request
116    let request_message = JsonRpcMessage {
117        id: body.get("id").cloned(),
118        method: body
119            .get("method")
120            .and_then(|m| m.as_str())
121            .map(String::from),
122        params: body.get("params").cloned(),
123        result: None,
124        error: None,
125        timestamp: std::time::SystemTime::now(),
126        direction: MessageDirection::Request,
127        transport: TransportType::Http,
128        headers: Some(header_map.clone()),
129    };
130
131    let _ = message_sender.send(request_message.clone());
132
133    // Check if we're in pause mode and should intercept the request
134    if let Some(ref state) = proxy_state {
135        let should_intercept = if let Ok(app_mode) = state.app_mode.lock() {
136            matches!(*app_mode, AppMode::Paused)
137        } else {
138            false
139        };
140
141        if should_intercept {
142            // Create oneshot channel for decision
143            let (decision_sender, decision_receiver) = oneshot::channel();
144
145            // Create a pending request
146            let pending_request = PendingRequest {
147                id: Uuid::new_v4().to_string(),
148                original_request: request_message,
149                modified_request: None,
150                modified_headers: None,
151                decision_sender,
152            };
153
154            // Send to app for interception
155            let _ = state.pending_sender.send(pending_request);
156
157            // Wait for user decision with timeout
158            let decision = tokio::time::timeout(
159                std::time::Duration::from_secs(300), // 5 minute timeout
160                decision_receiver,
161            )
162            .await;
163
164            return match decision {
165                Ok(Ok(ProxyDecision::Allow(modified_json, modified_headers))) => {
166                    // Use modified JSON if provided, otherwise use original body
167                    let request_body = modified_json.unwrap_or(body);
168
169                    // Use modified headers if provided, otherwise use original headers
170                    let final_headers = if let Some(mod_headers) = modified_headers {
171                        // Convert HashMap to HeaderMap
172                        let mut header_map = warp::http::HeaderMap::new();
173                        for (key, value) in mod_headers {
174                            if let (Ok(header_name), Ok(header_value)) = (
175                                warp::http::header::HeaderName::from_bytes(key.as_bytes()),
176                                warp::http::header::HeaderValue::from_str(&value),
177                            ) {
178                                header_map.insert(header_name, header_value);
179                            }
180                        }
181                        header_map
182                    } else {
183                        headers
184                    };
185
186                    forward_request(
187                        final_headers,
188                        request_body,
189                        format!("{}{}", target_url, path.as_str()),
190                        client,
191                        message_sender,
192                    )
193                    .await
194                }
195                Ok(Ok(ProxyDecision::Block)) => {
196                    // Return blocked response
197                    Ok(Box::new(warp::reply::with_status(
198                        warp::reply::json(&serde_json::json!({
199                            "jsonrpc": "2.0",
200                            "id": body.get("id"),
201                            "error": {
202                                "code": -32603,
203                                "message": "Request blocked by user"
204                            }
205                        })),
206                        warp::http::StatusCode::OK,
207                    )))
208                }
209                Ok(Ok(ProxyDecision::Complete(response_json))) => {
210                    // Log the custom response
211                    let response_message = JsonRpcMessage {
212                        id: response_json.get("id").cloned(),
213                        method: None,
214                        params: None,
215                        result: response_json.get("result").cloned(),
216                        error: response_json.get("error").cloned(),
217                        timestamp: std::time::SystemTime::now(),
218                        direction: MessageDirection::Response,
219                        transport: TransportType::Http,
220                        headers: Some(HashMap::from([
221                            ("content-type".to_string(), "application/json".to_string()),
222                            ("x-proxy-completed".to_string(), "true".to_string()),
223                        ])),
224                    };
225
226                    let _ = message_sender.send(response_message);
227
228                    // Return the custom response
229                    Ok(Box::new(warp::reply::with_status(
230                        warp::reply::json(&response_json),
231                        warp::http::StatusCode::OK,
232                    )))
233                }
234                Ok(Err(_)) | Err(_) => {
235                    // Timeout or channel error - return timeout response
236                    Ok(Box::new(warp::reply::with_status(
237                        warp::reply::json(&serde_json::json!({
238                            "jsonrpc": "2.0",
239                            "id": body.get("id"),
240                            "error": {
241                                "code": -32603,
242                                "message": "Request timed out waiting for user decision"
243                            }
244                        })),
245                        warp::http::StatusCode::REQUEST_TIMEOUT,
246                    )))
247                }
248            };
249        }
250    }
251
252    // Normal forwarding (not intercepted)
253    forward_request(
254        headers,
255        body,
256        format!("{}{}", target_url, path.as_str()),
257        client,
258        message_sender,
259    )
260    .await
261}
262
263async fn forward_request(
264    headers: warp::http::HeaderMap,
265    body: Value,
266    target_url: String,
267    client: Client,
268    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
269) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
270    // Forward the request to the target
271    let mut request_builder = client.post(&target_url).json(&body);
272
273    // Forward relevant headers
274    for (name, value) in headers.iter() {
275        if should_forward_header(name.as_str()) {
276            request_builder = request_builder.header(name, value);
277        }
278    }
279
280    match request_builder.send().await {
281        Ok(response) => {
282            let status = response.status();
283            let response_headers = response.headers().clone();
284
285            // Convert response headers
286            let mut response_header_map = HashMap::new();
287            for (name, value) in response_headers.iter() {
288                if let Ok(value_str) = value.to_str() {
289                    response_header_map.insert(name.to_string(), value_str.to_string());
290                }
291            }
292
293            // Get the response text - reqwest should handle decompression automatically
294            match response.text().await {
295                Ok(response_text) => {
296                    // Try to parse as JSON
297                    match serde_json::from_str::<Value>(&response_text) {
298                        Ok(response_body) => {
299                            // Valid JSON response
300                            let response_message = JsonRpcMessage {
301                                id: response_body.get("id").cloned(),
302                                method: None,
303                                params: None,
304                                result: response_body.get("result").cloned(),
305                                error: response_body.get("error").cloned(),
306                                timestamp: std::time::SystemTime::now(),
307                                direction: MessageDirection::Response,
308                                transport: TransportType::Http,
309                                headers: Some(response_header_map.clone()),
310                            };
311
312                            let _ = message_sender.send(response_message);
313
314                            // Return the original response as-is
315                            Ok(Box::new(warp::reply::with_status(
316                                warp::reply::json(&response_body),
317                                status,
318                            )))
319                        }
320                        Err(parse_error) => {
321                            // Not valid JSON - analyze the response to provide better error info
322                            let content_type = response_header_map
323                                .get("content-type")
324                                .unwrap_or(&"unknown".to_string())
325                                .clone();
326
327                            // Check if response contains null bytes (binary data)
328                            let has_null_bytes = response_text.contains('\0');
329                            let is_empty = response_text.trim().is_empty();
330
331                            // Get a safe preview of the response content
332                            let content_preview = if has_null_bytes {
333                                // Show hex representation for binary data
334                                let bytes: Vec<u8> = response_text.bytes().take(50).collect();
335                                format!("Binary data: {:02x?}...", bytes)
336                            } else if response_text.trim().starts_with('{')
337                                || response_text.trim().starts_with('[')
338                            {
339                                // For JSON-like content, show more text
340                                if response_text.len() > 500 {
341                                    format!("{}...", &response_text[..500])
342                                } else {
343                                    response_text.clone()
344                                }
345                            } else if response_text.len() > 200 {
346                                format!("{}...", &response_text[..200])
347                            } else {
348                                response_text.clone()
349                            };
350
351                            // Determine the likely issue
352                            let issue_type = if is_empty {
353                                "empty_response"
354                            } else if has_null_bytes {
355                                "binary_data"
356                            } else if content_type.contains("text/html") {
357                                "html_response"
358                            } else if content_type.contains("application/json") {
359                                "malformed_json"
360                            } else {
361                                "unknown_format"
362                            };
363
364                            let error_message = JsonRpcMessage {
365                                id: body.get("id").cloned(),
366                                method: None,
367                                params: None,
368                                result: None,
369                                error: Some(serde_json::json!({
370                                    "code": -32700,
371                                    "message": format!("Invalid JSON response from server (HTTP {})", status),
372                                    "data": {
373                                        "issue_type": issue_type,
374                                        "content_type": content_type,
375                                        "response_preview": content_preview,
376                                        "response_length": response_text.len(),
377                                        "has_null_bytes": has_null_bytes,
378                                        "parse_error": parse_error.to_string(),
379                                        "target_url": target_url
380                                    }
381                                })),
382                                timestamp: std::time::SystemTime::now(),
383                                direction: MessageDirection::Response,
384                                transport: TransportType::Http,
385                                headers: Some(response_header_map.clone()),
386                            };
387
388                            let _ = message_sender.send(error_message);
389
390                            // Return a proper JSON-RPC error response
391                            Ok(Box::new(warp::reply::with_status(
392                                warp::reply::json(&serde_json::json!({
393                                    "jsonrpc": "2.0",
394                                    "id": body.get("id"),
395                                    "error": {
396                                        "code": -32700,
397                                        "message": format!("Invalid JSON response from server (HTTP {})", status),
398                                        "data": {
399                                            "issue_type": issue_type,
400                                            "content_type": content_type,
401                                            "has_null_bytes": has_null_bytes
402                                        }
403                                    }
404                                })),
405                                warp::http::StatusCode::OK, // Return 200 with JSON-RPC error
406                            )))
407                        }
408                    }
409                }
410                Err(_e) => {
411                    // Log error response
412                    let error_message = JsonRpcMessage {
413                        id: body.get("id").cloned(),
414                        method: None,
415                        params: None,
416                        result: None,
417                        error: Some(serde_json::json!({
418                            "code": -32603,
419                            "message": "Internal error - failed to read response"
420                        })),
421                        timestamp: std::time::SystemTime::now(),
422                        direction: MessageDirection::Response,
423                        transport: TransportType::Http,
424                        headers: Some(response_header_map),
425                    };
426
427                    let _ = message_sender.send(error_message);
428
429                    Ok(Box::new(warp::reply::with_status(
430                        warp::reply::json(&serde_json::json!({
431                            "jsonrpc": "2.0",
432                            "id": body.get("id"),
433                            "error": {
434                                "code": -32603,
435                                "message": "Internal error - failed to read response"
436                            }
437                        })),
438                        warp::http::StatusCode::INTERNAL_SERVER_ERROR,
439                    )))
440                }
441            }
442        }
443        Err(_e) => {
444            // Log connection error
445            let error_message = JsonRpcMessage {
446                id: body.get("id").cloned(),
447                method: None,
448                params: None,
449                result: None,
450                error: Some(serde_json::json!({
451                    "code": -32603,
452                    "message": "Failed to connect to target server"
453                })),
454                timestamp: std::time::SystemTime::now(),
455                direction: MessageDirection::Response,
456                transport: TransportType::Http,
457                headers: None,
458            };
459
460            let _ = message_sender.send(error_message);
461
462            Ok(Box::new(warp::reply::with_status(
463                warp::reply::json(&serde_json::json!({
464                    "jsonrpc": "2.0",
465                    "id": body.get("id"),
466                    "error": {
467                        "code": -32603,
468                        "message": "Failed to connect to target server"
469                    }
470                })),
471                warp::http::StatusCode::BAD_GATEWAY,
472            )))
473        }
474    }
475}
476
477fn should_forward_header(header_name: &str) -> bool {
478    !matches!(
479        header_name.to_lowercase().as_str(),
480        "host" | "content-length" | "transfer-encoding" | "connection"
481    )
482}