1use crate::app::{
2 AppMode, JsonRpcMessage, MessageDirection, PendingRequest, ProxyDecision, TransportType,
3};
4use anyhow::Result;
5use reqwest::Client;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use uuid::Uuid;
10
11use tokio::sync::{mpsc, oneshot};
12use warp::Filter;
13
14#[derive(Clone)]
16pub struct ProxyState {
17 pub app_mode: Arc<Mutex<AppMode>>,
18 pub pending_sender: mpsc::UnboundedSender<PendingRequest>,
19}
20
21pub struct ProxyServer {
22 listen_port: u16,
23 target_url: String,
24 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
25 client: Client,
26 proxy_state: Option<ProxyState>,
27}
28
29impl ProxyServer {
30 pub fn new(
31 listen_port: u16,
32 target_url: String,
33 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
34 ) -> Self {
35 let client = Client::builder()
37 .pool_max_idle_per_host(50) .pool_idle_timeout(std::time::Duration::from_secs(30))
39 .http2_max_frame_size(Some(16384)) .http2_keep_alive_interval(Some(std::time::Duration::from_secs(10)))
41 .build()
42 .unwrap_or_else(|_| Client::new()); Self {
45 listen_port,
46 target_url,
47 message_sender,
48 client,
49 proxy_state: None,
50 }
51 }
52
53 pub fn with_state(mut self, proxy_state: ProxyState) -> Self {
54 self.proxy_state = Some(proxy_state);
55 self
56 }
57
58 pub async fn start(&self) -> Result<()> {
59 let target_url = self.target_url.clone();
60 let client = self.client.clone();
61 let message_sender = self.message_sender.clone();
62 let proxy_state = self.proxy_state.clone();
63
64 let proxy_route = warp::path::full()
65 .and(warp::post())
66 .and(warp::header::headers_cloned())
67 .and(warp::body::json())
68 .and_then(
69 move |path: warp::path::FullPath, headers: warp::http::HeaderMap, body: Value| {
70 let target_url = target_url.clone();
71 let client = client.clone();
72 let message_sender = message_sender.clone();
73 let proxy_state = proxy_state.clone();
74
75 async move {
76 handle_proxy_request(
77 path,
78 headers,
79 body,
80 target_url,
81 client,
82 message_sender,
83 proxy_state,
84 )
85 .await
86 }
87 },
88 );
89
90 let cors = warp::cors()
91 .allow_any_origin()
92 .allow_headers(vec!["content-type", "authorization"])
93 .allow_methods(vec!["POST", "OPTIONS"]);
94
95 let routes = proxy_route.with(cors);
96
97 warp::serve(routes)
100 .run(([127, 0, 0, 1], self.listen_port))
101 .await;
102
103 Ok(())
104 }
105}
106
107async fn handle_proxy_request(
108 path: warp::path::FullPath,
109 headers: warp::http::HeaderMap,
110 body: Value,
111 target_url: String,
112 client: Client,
113 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
114 proxy_state: Option<ProxyState>,
115) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
116 let mut header_map = HashMap::new();
118 for (name, value) in headers.iter() {
119 if let Ok(value_str) = value.to_str() {
120 header_map.insert(name.to_string(), value_str.to_string());
121 }
122 }
123
124 let request_message = JsonRpcMessage {
126 id: body.get("id").cloned(),
127 method: body
128 .get("method")
129 .and_then(|m| m.as_str())
130 .map(String::from),
131 params: body.get("params").cloned(),
132 result: None,
133 error: None,
134 timestamp: std::time::SystemTime::now(),
135 direction: MessageDirection::Request,
136 transport: TransportType::Http,
137 headers: Some(header_map.clone()),
138 };
139
140 let _ = message_sender.send(request_message.clone());
141
142 if let Some(ref state) = proxy_state {
144 let should_intercept = if let Ok(app_mode) = state.app_mode.lock() {
145 matches!(*app_mode, AppMode::Paused)
146 } else {
147 false
148 };
149
150 if should_intercept {
151 let (decision_sender, decision_receiver) = oneshot::channel();
153
154 let pending_request = PendingRequest {
156 id: Uuid::new_v4().to_string(),
157 original_request: request_message,
158 modified_request: None,
159 modified_headers: None,
160 decision_sender,
161 };
162
163 let _ = state.pending_sender.send(pending_request);
165
166 let decision = tokio::time::timeout(
168 std::time::Duration::from_secs(300), decision_receiver,
170 )
171 .await;
172
173 return match decision {
174 Ok(Ok(ProxyDecision::Allow(modified_json, modified_headers))) => {
175 let request_body = modified_json.unwrap_or(body);
177
178 let final_headers = if let Some(mod_headers) = modified_headers {
180 let mut header_map = warp::http::HeaderMap::new();
182 for (key, value) in mod_headers {
183 if let (Ok(header_name), Ok(header_value)) = (
184 warp::http::header::HeaderName::from_bytes(key.as_bytes()),
185 warp::http::header::HeaderValue::from_str(&value),
186 ) {
187 header_map.insert(header_name, header_value);
188 }
189 }
190 header_map
191 } else {
192 headers
193 };
194
195 forward_request(
196 final_headers,
197 request_body,
198 format!("{}{}", target_url, path.as_str()),
199 client,
200 message_sender,
201 )
202 .await
203 }
204 Ok(Ok(ProxyDecision::Block)) => {
205 Ok(Box::new(warp::reply::with_status(
207 warp::reply::json(&serde_json::json!({
208 "jsonrpc": "2.0",
209 "id": body.get("id"),
210 "error": {
211 "code": -32603,
212 "message": "Request blocked by user"
213 }
214 })),
215 warp::http::StatusCode::OK,
216 )))
217 }
218 Ok(Ok(ProxyDecision::Complete(response_json))) => {
219 let response_message = JsonRpcMessage {
221 id: response_json.get("id").cloned(),
222 method: None,
223 params: None,
224 result: response_json.get("result").cloned(),
225 error: response_json.get("error").cloned(),
226 timestamp: std::time::SystemTime::now(),
227 direction: MessageDirection::Response,
228 transport: TransportType::Http,
229 headers: Some(HashMap::from([
230 ("content-type".to_string(), "application/json".to_string()),
231 ("x-proxy-completed".to_string(), "true".to_string()),
232 ])),
233 };
234
235 let _ = message_sender.send(response_message);
236
237 Ok(Box::new(warp::reply::with_status(
239 warp::reply::json(&response_json),
240 warp::http::StatusCode::OK,
241 )))
242 }
243 Ok(Err(_)) | Err(_) => {
244 Ok(Box::new(warp::reply::with_status(
246 warp::reply::json(&serde_json::json!({
247 "jsonrpc": "2.0",
248 "id": body.get("id"),
249 "error": {
250 "code": -32603,
251 "message": "Request timed out waiting for user decision"
252 }
253 })),
254 warp::http::StatusCode::REQUEST_TIMEOUT,
255 )))
256 }
257 };
258 }
259 }
260
261 forward_request(
263 headers,
264 body,
265 format!("{}{}", target_url, path.as_str()),
266 client,
267 message_sender,
268 )
269 .await
270}
271
272async fn forward_request(
273 headers: warp::http::HeaderMap,
274 body: Value,
275 target_url: String,
276 client: Client,
277 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
278) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
279 let mut request_builder = client.post(&target_url).json(&body);
281
282 for (name, value) in headers.iter() {
284 if should_forward_header(name.as_str()) {
285 request_builder = request_builder.header(name, value);
286 }
287 }
288
289 match request_builder.send().await {
290 Ok(response) => {
291 let status = response.status();
292 let response_headers = response.headers().clone();
293
294 let mut response_header_map = HashMap::new();
296 for (name, value) in response_headers.iter() {
297 if let Ok(value_str) = value.to_str() {
298 response_header_map.insert(name.to_string(), value_str.to_string());
299 }
300 }
301
302 match response.text().await {
304 Ok(response_text) => {
305 match serde_json::from_str::<Value>(&response_text) {
307 Ok(response_body) => {
308 let response_message = JsonRpcMessage {
310 id: response_body.get("id").cloned(),
311 method: None,
312 params: None,
313 result: response_body.get("result").cloned(),
314 error: response_body.get("error").cloned(),
315 timestamp: std::time::SystemTime::now(),
316 direction: MessageDirection::Response,
317 transport: TransportType::Http,
318 headers: Some(response_header_map.clone()),
319 };
320
321 let _ = message_sender.send(response_message);
322
323 Ok(Box::new(warp::reply::with_status(
325 warp::reply::json(&response_body),
326 status,
327 )))
328 }
329 Err(parse_error) => {
330 let content_type = response_header_map
332 .get("content-type")
333 .unwrap_or(&"unknown".to_string())
334 .clone();
335
336 let has_null_bytes = response_text.contains('\0');
338 let is_empty = response_text.trim().is_empty();
339
340 let content_preview = if has_null_bytes {
342 let bytes: Vec<u8> = response_text.bytes().take(50).collect();
344 format!("Binary data: {:02x?}...", bytes)
345 } else if response_text.trim().starts_with('{')
346 || response_text.trim().starts_with('[')
347 {
348 if response_text.len() > 500 {
350 format!("{}...", &response_text[..500])
351 } else {
352 response_text.clone()
353 }
354 } else if response_text.len() > 200 {
355 format!("{}...", &response_text[..200])
356 } else {
357 response_text.clone()
358 };
359
360 let issue_type = if is_empty {
362 "empty_response"
363 } else if has_null_bytes {
364 "binary_data"
365 } else if content_type.contains("text/html") {
366 "html_response"
367 } else if content_type.contains("application/json") {
368 "malformed_json"
369 } else {
370 "unknown_format"
371 };
372
373 let error_message = JsonRpcMessage {
374 id: body.get("id").cloned(),
375 method: None,
376 params: None,
377 result: None,
378 error: Some(serde_json::json!({
379 "code": -32700,
380 "message": format!("Invalid JSON response from server (HTTP {})", status),
381 "data": {
382 "issue_type": issue_type,
383 "content_type": content_type,
384 "response_preview": content_preview,
385 "response_length": response_text.len(),
386 "has_null_bytes": has_null_bytes,
387 "parse_error": parse_error.to_string(),
388 "target_url": target_url
389 }
390 })),
391 timestamp: std::time::SystemTime::now(),
392 direction: MessageDirection::Response,
393 transport: TransportType::Http,
394 headers: Some(response_header_map.clone()),
395 };
396
397 let _ = message_sender.send(error_message);
398
399 Ok(Box::new(warp::reply::with_status(
401 warp::reply::json(&serde_json::json!({
402 "jsonrpc": "2.0",
403 "id": body.get("id"),
404 "error": {
405 "code": -32700,
406 "message": format!("Invalid JSON response from server (HTTP {})", status),
407 "data": {
408 "issue_type": issue_type,
409 "content_type": content_type,
410 "has_null_bytes": has_null_bytes
411 }
412 }
413 })),
414 warp::http::StatusCode::OK, )))
416 }
417 }
418 }
419 Err(_e) => {
420 let error_message = JsonRpcMessage {
422 id: body.get("id").cloned(),
423 method: None,
424 params: None,
425 result: None,
426 error: Some(serde_json::json!({
427 "code": -32603,
428 "message": "Internal error - failed to read response"
429 })),
430 timestamp: std::time::SystemTime::now(),
431 direction: MessageDirection::Response,
432 transport: TransportType::Http,
433 headers: Some(response_header_map),
434 };
435
436 let _ = message_sender.send(error_message);
437
438 Ok(Box::new(warp::reply::with_status(
439 warp::reply::json(&serde_json::json!({
440 "jsonrpc": "2.0",
441 "id": body.get("id"),
442 "error": {
443 "code": -32603,
444 "message": "Internal error - failed to read response"
445 }
446 })),
447 warp::http::StatusCode::INTERNAL_SERVER_ERROR,
448 )))
449 }
450 }
451 }
452 Err(_e) => {
453 let error_message = JsonRpcMessage {
455 id: body.get("id").cloned(),
456 method: None,
457 params: None,
458 result: None,
459 error: Some(serde_json::json!({
460 "code": -32603,
461 "message": "Failed to connect to target server"
462 })),
463 timestamp: std::time::SystemTime::now(),
464 direction: MessageDirection::Response,
465 transport: TransportType::Http,
466 headers: None,
467 };
468
469 let _ = message_sender.send(error_message);
470
471 Ok(Box::new(warp::reply::with_status(
472 warp::reply::json(&serde_json::json!({
473 "jsonrpc": "2.0",
474 "id": body.get("id"),
475 "error": {
476 "code": -32603,
477 "message": "Failed to connect to target server"
478 }
479 })),
480 warp::http::StatusCode::BAD_GATEWAY,
481 )))
482 }
483 }
484}
485
486fn should_forward_header(header_name: &str) -> bool {
487 !matches!(
488 header_name.to_lowercase().as_str(),
489 "host" | "content-length" | "transfer-encoding" | "connection"
490 )
491}