jsonrpc_debugger/
proxy.rs

1use crate::app::{
2    AppMode, JsonRpcMessage, MessageDirection, PendingRequest, ProxyDecision, TransportType,
3};
4use anyhow::Result;
5use reqwest::Client;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use uuid::Uuid;
10
11use tokio::sync::{mpsc, oneshot};
12use warp::Filter;
13
14// Shared state between app and proxy
15#[derive(Clone)]
16pub struct ProxyState {
17    pub app_mode: Arc<Mutex<AppMode>>,
18    pub pending_sender: mpsc::UnboundedSender<PendingRequest>,
19}
20
21pub struct ProxyServer {
22    listen_port: u16,
23    target_url: String,
24    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
25    client: Client,
26    proxy_state: Option<ProxyState>,
27}
28
29impl ProxyServer {
30    pub fn new(
31        listen_port: u16,
32        target_url: String,
33        message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
34    ) -> Self {
35        // Configure client for higher concurrency
36        let client = Client::builder()
37            .pool_max_idle_per_host(50) // More idle connections
38            .pool_idle_timeout(std::time::Duration::from_secs(30))
39            .http2_max_frame_size(Some(16384)) // Larger frame size
40            .http2_keep_alive_interval(Some(std::time::Duration::from_secs(10)))
41            .build()
42            .unwrap_or_else(|_| Client::new()); // Fallback to default if config fails
43
44        Self {
45            listen_port,
46            target_url,
47            message_sender,
48            client,
49            proxy_state: None,
50        }
51    }
52
53    pub fn with_state(mut self, proxy_state: ProxyState) -> Self {
54        self.proxy_state = Some(proxy_state);
55        self
56    }
57
58    pub async fn start(&self) -> Result<()> {
59        let target_url = self.target_url.clone();
60        let client = self.client.clone();
61        let message_sender = self.message_sender.clone();
62        let proxy_state = self.proxy_state.clone();
63
64        let proxy_route = warp::path::full()
65            .and(warp::post())
66            .and(warp::header::headers_cloned())
67            .and(warp::body::json())
68            .and_then(
69                move |path: warp::path::FullPath, headers: warp::http::HeaderMap, body: Value| {
70                    let target_url = target_url.clone();
71                    let client = client.clone();
72                    let message_sender = message_sender.clone();
73                    let proxy_state = proxy_state.clone();
74
75                    async move {
76                        handle_proxy_request(
77                            path,
78                            headers,
79                            body,
80                            target_url,
81                            client,
82                            message_sender,
83                            proxy_state,
84                        )
85                        .await
86                    }
87                },
88            );
89
90        let cors = warp::cors()
91            .allow_any_origin()
92            .allow_headers(vec!["content-type", "authorization"])
93            .allow_methods(vec!["POST", "OPTIONS"]);
94
95        let routes = proxy_route.with(cors);
96
97        // Use a simpler approach - just run the server
98        // The task abort from main.rs will handle shutdown
99        warp::serve(routes)
100            .run(([127, 0, 0, 1], self.listen_port))
101            .await;
102
103        Ok(())
104    }
105}
106
107async fn handle_proxy_request(
108    path: warp::path::FullPath,
109    headers: warp::http::HeaderMap,
110    body: Value,
111    target_url: String,
112    client: Client,
113    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
114    proxy_state: Option<ProxyState>,
115) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
116    // Convert headers to HashMap
117    let mut header_map = HashMap::new();
118    for (name, value) in headers.iter() {
119        if let Ok(value_str) = value.to_str() {
120            header_map.insert(name.to_string(), value_str.to_string());
121        }
122    }
123
124    // Log the incoming request
125    let request_message = JsonRpcMessage {
126        id: body.get("id").cloned(),
127        method: body
128            .get("method")
129            .and_then(|m| m.as_str())
130            .map(String::from),
131        params: body.get("params").cloned(),
132        result: None,
133        error: None,
134        timestamp: std::time::SystemTime::now(),
135        direction: MessageDirection::Request,
136        transport: TransportType::Http,
137        headers: Some(header_map.clone()),
138    };
139
140    let _ = message_sender.send(request_message.clone());
141
142    // Check if we're in pause mode and should intercept the request
143    if let Some(ref state) = proxy_state {
144        let should_intercept = if let Ok(app_mode) = state.app_mode.lock() {
145            matches!(*app_mode, AppMode::Paused)
146        } else {
147            false
148        };
149
150        if should_intercept {
151            // Create oneshot channel for decision
152            let (decision_sender, decision_receiver) = oneshot::channel();
153
154            // Create a pending request
155            let pending_request = PendingRequest {
156                id: Uuid::new_v4().to_string(),
157                original_request: request_message,
158                modified_request: None,
159                modified_headers: None,
160                decision_sender,
161            };
162
163            // Send to app for interception
164            let _ = state.pending_sender.send(pending_request);
165
166            // Wait for user decision with timeout
167            let decision = tokio::time::timeout(
168                std::time::Duration::from_secs(300), // 5 minute timeout
169                decision_receiver,
170            )
171            .await;
172
173            return match decision {
174                Ok(Ok(ProxyDecision::Allow(modified_json, modified_headers))) => {
175                    // Use modified JSON if provided, otherwise use original body
176                    let request_body = modified_json.unwrap_or(body);
177
178                    // Use modified headers if provided, otherwise use original headers
179                    let final_headers = if let Some(mod_headers) = modified_headers {
180                        // Convert HashMap to HeaderMap
181                        let mut header_map = warp::http::HeaderMap::new();
182                        for (key, value) in mod_headers {
183                            if let (Ok(header_name), Ok(header_value)) = (
184                                warp::http::header::HeaderName::from_bytes(key.as_bytes()),
185                                warp::http::header::HeaderValue::from_str(&value),
186                            ) {
187                                header_map.insert(header_name, header_value);
188                            }
189                        }
190                        header_map
191                    } else {
192                        headers
193                    };
194
195                    forward_request(
196                        final_headers,
197                        request_body,
198                        format!("{}{}", target_url, path.as_str()),
199                        client,
200                        message_sender,
201                    )
202                    .await
203                }
204                Ok(Ok(ProxyDecision::Block)) => {
205                    // Return blocked response
206                    Ok(Box::new(warp::reply::with_status(
207                        warp::reply::json(&serde_json::json!({
208                            "jsonrpc": "2.0",
209                            "id": body.get("id"),
210                            "error": {
211                                "code": -32603,
212                                "message": "Request blocked by user"
213                            }
214                        })),
215                        warp::http::StatusCode::OK,
216                    )))
217                }
218                Ok(Ok(ProxyDecision::Complete(response_json))) => {
219                    // Log the custom response
220                    let response_message = JsonRpcMessage {
221                        id: response_json.get("id").cloned(),
222                        method: None,
223                        params: None,
224                        result: response_json.get("result").cloned(),
225                        error: response_json.get("error").cloned(),
226                        timestamp: std::time::SystemTime::now(),
227                        direction: MessageDirection::Response,
228                        transport: TransportType::Http,
229                        headers: Some(HashMap::from([
230                            ("content-type".to_string(), "application/json".to_string()),
231                            ("x-proxy-completed".to_string(), "true".to_string()),
232                        ])),
233                    };
234
235                    let _ = message_sender.send(response_message);
236
237                    // Return the custom response
238                    Ok(Box::new(warp::reply::with_status(
239                        warp::reply::json(&response_json),
240                        warp::http::StatusCode::OK,
241                    )))
242                }
243                Ok(Err(_)) | Err(_) => {
244                    // Timeout or channel error - return timeout response
245                    Ok(Box::new(warp::reply::with_status(
246                        warp::reply::json(&serde_json::json!({
247                            "jsonrpc": "2.0",
248                            "id": body.get("id"),
249                            "error": {
250                                "code": -32603,
251                                "message": "Request timed out waiting for user decision"
252                            }
253                        })),
254                        warp::http::StatusCode::REQUEST_TIMEOUT,
255                    )))
256                }
257            };
258        }
259    }
260
261    // Normal forwarding (not intercepted)
262    forward_request(
263        headers,
264        body,
265        format!("{}{}", target_url, path.as_str()),
266        client,
267        message_sender,
268    )
269    .await
270}
271
272async fn forward_request(
273    headers: warp::http::HeaderMap,
274    body: Value,
275    target_url: String,
276    client: Client,
277    message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
278) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
279    // Forward the request to the target
280    let mut request_builder = client.post(&target_url).json(&body);
281
282    // Forward relevant headers
283    for (name, value) in headers.iter() {
284        if should_forward_header(name.as_str()) {
285            request_builder = request_builder.header(name, value);
286        }
287    }
288
289    match request_builder.send().await {
290        Ok(response) => {
291            let status = response.status();
292            let response_headers = response.headers().clone();
293
294            // Convert response headers
295            let mut response_header_map = HashMap::new();
296            for (name, value) in response_headers.iter() {
297                if let Ok(value_str) = value.to_str() {
298                    response_header_map.insert(name.to_string(), value_str.to_string());
299                }
300            }
301
302            // Get the response text - reqwest should handle decompression automatically
303            match response.text().await {
304                Ok(response_text) => {
305                    // Try to parse as JSON
306                    match serde_json::from_str::<Value>(&response_text) {
307                        Ok(response_body) => {
308                            // Valid JSON response
309                            let response_message = JsonRpcMessage {
310                                id: response_body.get("id").cloned(),
311                                method: None,
312                                params: None,
313                                result: response_body.get("result").cloned(),
314                                error: response_body.get("error").cloned(),
315                                timestamp: std::time::SystemTime::now(),
316                                direction: MessageDirection::Response,
317                                transport: TransportType::Http,
318                                headers: Some(response_header_map.clone()),
319                            };
320
321                            let _ = message_sender.send(response_message);
322
323                            // Return the original response as-is
324                            Ok(Box::new(warp::reply::with_status(
325                                warp::reply::json(&response_body),
326                                status,
327                            )))
328                        }
329                        Err(parse_error) => {
330                            // Not valid JSON - analyze the response to provide better error info
331                            let content_type = response_header_map
332                                .get("content-type")
333                                .unwrap_or(&"unknown".to_string())
334                                .clone();
335
336                            // Check if response contains null bytes (binary data)
337                            let has_null_bytes = response_text.contains('\0');
338                            let is_empty = response_text.trim().is_empty();
339
340                            // Get a safe preview of the response content
341                            let content_preview = if has_null_bytes {
342                                // Show hex representation for binary data
343                                let bytes: Vec<u8> = response_text.bytes().take(50).collect();
344                                format!("Binary data: {:02x?}...", bytes)
345                            } else if response_text.trim().starts_with('{')
346                                || response_text.trim().starts_with('[')
347                            {
348                                // For JSON-like content, show more text
349                                if response_text.len() > 500 {
350                                    format!("{}...", &response_text[..500])
351                                } else {
352                                    response_text.clone()
353                                }
354                            } else if response_text.len() > 200 {
355                                format!("{}...", &response_text[..200])
356                            } else {
357                                response_text.clone()
358                            };
359
360                            // Determine the likely issue
361                            let issue_type = if is_empty {
362                                "empty_response"
363                            } else if has_null_bytes {
364                                "binary_data"
365                            } else if content_type.contains("text/html") {
366                                "html_response"
367                            } else if content_type.contains("application/json") {
368                                "malformed_json"
369                            } else {
370                                "unknown_format"
371                            };
372
373                            let error_message = JsonRpcMessage {
374                                id: body.get("id").cloned(),
375                                method: None,
376                                params: None,
377                                result: None,
378                                error: Some(serde_json::json!({
379                                    "code": -32700,
380                                    "message": format!("Invalid JSON response from server (HTTP {})", status),
381                                    "data": {
382                                        "issue_type": issue_type,
383                                        "content_type": content_type,
384                                        "response_preview": content_preview,
385                                        "response_length": response_text.len(),
386                                        "has_null_bytes": has_null_bytes,
387                                        "parse_error": parse_error.to_string(),
388                                        "target_url": target_url
389                                    }
390                                })),
391                                timestamp: std::time::SystemTime::now(),
392                                direction: MessageDirection::Response,
393                                transport: TransportType::Http,
394                                headers: Some(response_header_map.clone()),
395                            };
396
397                            let _ = message_sender.send(error_message);
398
399                            // Return a proper JSON-RPC error response
400                            Ok(Box::new(warp::reply::with_status(
401                                warp::reply::json(&serde_json::json!({
402                                    "jsonrpc": "2.0",
403                                    "id": body.get("id"),
404                                    "error": {
405                                        "code": -32700,
406                                        "message": format!("Invalid JSON response from server (HTTP {})", status),
407                                        "data": {
408                                            "issue_type": issue_type,
409                                            "content_type": content_type,
410                                            "has_null_bytes": has_null_bytes
411                                        }
412                                    }
413                                })),
414                                warp::http::StatusCode::OK, // Return 200 with JSON-RPC error
415                            )))
416                        }
417                    }
418                }
419                Err(_e) => {
420                    // Log error response
421                    let error_message = JsonRpcMessage {
422                        id: body.get("id").cloned(),
423                        method: None,
424                        params: None,
425                        result: None,
426                        error: Some(serde_json::json!({
427                            "code": -32603,
428                            "message": "Internal error - failed to read response"
429                        })),
430                        timestamp: std::time::SystemTime::now(),
431                        direction: MessageDirection::Response,
432                        transport: TransportType::Http,
433                        headers: Some(response_header_map),
434                    };
435
436                    let _ = message_sender.send(error_message);
437
438                    Ok(Box::new(warp::reply::with_status(
439                        warp::reply::json(&serde_json::json!({
440                            "jsonrpc": "2.0",
441                            "id": body.get("id"),
442                            "error": {
443                                "code": -32603,
444                                "message": "Internal error - failed to read response"
445                            }
446                        })),
447                        warp::http::StatusCode::INTERNAL_SERVER_ERROR,
448                    )))
449                }
450            }
451        }
452        Err(_e) => {
453            // Log connection error
454            let error_message = JsonRpcMessage {
455                id: body.get("id").cloned(),
456                method: None,
457                params: None,
458                result: None,
459                error: Some(serde_json::json!({
460                    "code": -32603,
461                    "message": "Failed to connect to target server"
462                })),
463                timestamp: std::time::SystemTime::now(),
464                direction: MessageDirection::Response,
465                transport: TransportType::Http,
466                headers: None,
467            };
468
469            let _ = message_sender.send(error_message);
470
471            Ok(Box::new(warp::reply::with_status(
472                warp::reply::json(&serde_json::json!({
473                    "jsonrpc": "2.0",
474                    "id": body.get("id"),
475                    "error": {
476                        "code": -32603,
477                        "message": "Failed to connect to target server"
478                    }
479                })),
480                warp::http::StatusCode::BAD_GATEWAY,
481            )))
482        }
483    }
484}
485
486fn should_forward_header(header_name: &str) -> bool {
487    !matches!(
488        header_name.to_lowercase().as_str(),
489        "host" | "content-length" | "transfer-encoding" | "connection"
490    )
491}