1use crate::app::{
2 AppMode, JsonRpcMessage, MessageDirection, PendingRequest, ProxyDecision, TransportType,
3};
4use anyhow::Result;
5use reqwest::Client;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use uuid::Uuid;
10
11use tokio::sync::{mpsc, oneshot};
12use warp::Filter;
13
14#[derive(Clone)]
16pub struct ProxyState {
17 pub app_mode: Arc<Mutex<AppMode>>,
18 pub pending_sender: mpsc::UnboundedSender<PendingRequest>,
19}
20
21pub struct ProxyServer {
22 listen_port: u16,
23 target_url: String,
24 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
25 client: Client,
26 proxy_state: Option<ProxyState>,
27}
28
29impl ProxyServer {
30 pub fn new(
31 listen_port: u16,
32 target_url: String,
33 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
34 ) -> Self {
35 Self {
36 listen_port,
37 target_url,
38 message_sender,
39 client: Client::new(),
40 proxy_state: None,
41 }
42 }
43
44 pub fn with_state(mut self, proxy_state: ProxyState) -> Self {
45 self.proxy_state = Some(proxy_state);
46 self
47 }
48
49 pub async fn start(&self) -> Result<()> {
50 let target_url = self.target_url.clone();
51 let client = self.client.clone();
52 let message_sender = self.message_sender.clone();
53 let proxy_state = self.proxy_state.clone();
54
55 let proxy_route = warp::path::full()
56 .and(warp::post())
57 .and(warp::header::headers_cloned())
58 .and(warp::body::json())
59 .and_then(
60 move |path: warp::path::FullPath, headers: warp::http::HeaderMap, body: Value| {
61 let target_url = target_url.clone();
62 let client = client.clone();
63 let message_sender = message_sender.clone();
64 let proxy_state = proxy_state.clone();
65
66 async move {
67 handle_proxy_request(
68 path,
69 headers,
70 body,
71 target_url,
72 client,
73 message_sender,
74 proxy_state,
75 )
76 .await
77 }
78 },
79 );
80
81 let cors = warp::cors()
82 .allow_any_origin()
83 .allow_headers(vec!["content-type", "authorization"])
84 .allow_methods(vec!["POST", "OPTIONS"]);
85
86 let routes = proxy_route.with(cors);
87
88 warp::serve(routes)
91 .run(([127, 0, 0, 1], self.listen_port))
92 .await;
93
94 Ok(())
95 }
96}
97
98async fn handle_proxy_request(
99 path: warp::path::FullPath,
100 headers: warp::http::HeaderMap,
101 body: Value,
102 target_url: String,
103 client: Client,
104 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
105 proxy_state: Option<ProxyState>,
106) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
107 let mut header_map = HashMap::new();
109 for (name, value) in headers.iter() {
110 if let Ok(value_str) = value.to_str() {
111 header_map.insert(name.to_string(), value_str.to_string());
112 }
113 }
114
115 let request_message = JsonRpcMessage {
117 id: body.get("id").cloned(),
118 method: body
119 .get("method")
120 .and_then(|m| m.as_str())
121 .map(String::from),
122 params: body.get("params").cloned(),
123 result: None,
124 error: None,
125 timestamp: std::time::SystemTime::now(),
126 direction: MessageDirection::Request,
127 transport: TransportType::Http,
128 headers: Some(header_map.clone()),
129 };
130
131 let _ = message_sender.send(request_message.clone());
132
133 if let Some(ref state) = proxy_state {
135 let should_intercept = if let Ok(app_mode) = state.app_mode.lock() {
136 matches!(*app_mode, AppMode::Paused)
137 } else {
138 false
139 };
140
141 if should_intercept {
142 let (decision_sender, decision_receiver) = oneshot::channel();
144
145 let pending_request = PendingRequest {
147 id: Uuid::new_v4().to_string(),
148 original_request: request_message,
149 modified_request: None,
150 modified_headers: None,
151 decision_sender,
152 };
153
154 let _ = state.pending_sender.send(pending_request);
156
157 let decision = tokio::time::timeout(
159 std::time::Duration::from_secs(300), decision_receiver,
161 )
162 .await;
163
164 return match decision {
165 Ok(Ok(ProxyDecision::Allow(modified_json, modified_headers))) => {
166 let request_body = modified_json.unwrap_or(body);
168
169 let final_headers = if let Some(mod_headers) = modified_headers {
171 let mut header_map = warp::http::HeaderMap::new();
173 for (key, value) in mod_headers {
174 if let (Ok(header_name), Ok(header_value)) = (
175 warp::http::header::HeaderName::from_bytes(key.as_bytes()),
176 warp::http::header::HeaderValue::from_str(&value),
177 ) {
178 header_map.insert(header_name, header_value);
179 }
180 }
181 header_map
182 } else {
183 headers
184 };
185
186 forward_request(
187 final_headers,
188 request_body,
189 format!("{}{}", target_url, path.as_str()),
190 client,
191 message_sender,
192 )
193 .await
194 }
195 Ok(Ok(ProxyDecision::Block)) => {
196 Ok(Box::new(warp::reply::with_status(
198 warp::reply::json(&serde_json::json!({
199 "jsonrpc": "2.0",
200 "id": body.get("id"),
201 "error": {
202 "code": -32603,
203 "message": "Request blocked by user"
204 }
205 })),
206 warp::http::StatusCode::OK,
207 )))
208 }
209 Ok(Ok(ProxyDecision::Complete(response_json))) => {
210 let response_message = JsonRpcMessage {
212 id: response_json.get("id").cloned(),
213 method: None,
214 params: None,
215 result: response_json.get("result").cloned(),
216 error: response_json.get("error").cloned(),
217 timestamp: std::time::SystemTime::now(),
218 direction: MessageDirection::Response,
219 transport: TransportType::Http,
220 headers: Some(HashMap::from([
221 ("content-type".to_string(), "application/json".to_string()),
222 ("x-proxy-completed".to_string(), "true".to_string()),
223 ])),
224 };
225
226 let _ = message_sender.send(response_message);
227
228 Ok(Box::new(warp::reply::with_status(
230 warp::reply::json(&response_json),
231 warp::http::StatusCode::OK,
232 )))
233 }
234 Ok(Err(_)) | Err(_) => {
235 Ok(Box::new(warp::reply::with_status(
237 warp::reply::json(&serde_json::json!({
238 "jsonrpc": "2.0",
239 "id": body.get("id"),
240 "error": {
241 "code": -32603,
242 "message": "Request timed out waiting for user decision"
243 }
244 })),
245 warp::http::StatusCode::REQUEST_TIMEOUT,
246 )))
247 }
248 };
249 }
250 }
251
252 forward_request(
254 headers,
255 body,
256 format!("{}{}", target_url, path.as_str()),
257 client,
258 message_sender,
259 )
260 .await
261}
262
263async fn forward_request(
264 headers: warp::http::HeaderMap,
265 body: Value,
266 target_url: String,
267 client: Client,
268 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
269) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
270 let mut request_builder = client.post(&target_url).json(&body);
272
273 for (name, value) in headers.iter() {
275 if should_forward_header(name.as_str()) {
276 request_builder = request_builder.header(name, value);
277 }
278 }
279
280 match request_builder.send().await {
281 Ok(response) => {
282 let status = response.status();
283 let response_headers = response.headers().clone();
284
285 let mut response_header_map = HashMap::new();
287 for (name, value) in response_headers.iter() {
288 if let Ok(value_str) = value.to_str() {
289 response_header_map.insert(name.to_string(), value_str.to_string());
290 }
291 }
292
293 match response.text().await {
295 Ok(response_text) => {
296 match serde_json::from_str::<Value>(&response_text) {
298 Ok(response_body) => {
299 let response_message = JsonRpcMessage {
301 id: response_body.get("id").cloned(),
302 method: None,
303 params: None,
304 result: response_body.get("result").cloned(),
305 error: response_body.get("error").cloned(),
306 timestamp: std::time::SystemTime::now(),
307 direction: MessageDirection::Response,
308 transport: TransportType::Http,
309 headers: Some(response_header_map.clone()),
310 };
311
312 let _ = message_sender.send(response_message);
313
314 Ok(Box::new(warp::reply::with_status(
316 warp::reply::json(&response_body),
317 status,
318 )))
319 }
320 Err(parse_error) => {
321 let content_type = response_header_map
323 .get("content-type")
324 .unwrap_or(&"unknown".to_string())
325 .clone();
326
327 let has_null_bytes = response_text.contains('\0');
329 let is_empty = response_text.trim().is_empty();
330
331 let content_preview = if has_null_bytes {
333 let bytes: Vec<u8> = response_text.bytes().take(50).collect();
335 format!("Binary data: {:02x?}...", bytes)
336 } else if response_text.trim().starts_with('{')
337 || response_text.trim().starts_with('[')
338 {
339 if response_text.len() > 500 {
341 format!("{}...", &response_text[..500])
342 } else {
343 response_text.clone()
344 }
345 } else if response_text.len() > 200 {
346 format!("{}...", &response_text[..200])
347 } else {
348 response_text.clone()
349 };
350
351 let issue_type = if is_empty {
353 "empty_response"
354 } else if has_null_bytes {
355 "binary_data"
356 } else if content_type.contains("text/html") {
357 "html_response"
358 } else if content_type.contains("application/json") {
359 "malformed_json"
360 } else {
361 "unknown_format"
362 };
363
364 let error_message = JsonRpcMessage {
365 id: body.get("id").cloned(),
366 method: None,
367 params: None,
368 result: None,
369 error: Some(serde_json::json!({
370 "code": -32700,
371 "message": format!("Invalid JSON response from server (HTTP {})", status),
372 "data": {
373 "issue_type": issue_type,
374 "content_type": content_type,
375 "response_preview": content_preview,
376 "response_length": response_text.len(),
377 "has_null_bytes": has_null_bytes,
378 "parse_error": parse_error.to_string(),
379 "target_url": target_url
380 }
381 })),
382 timestamp: std::time::SystemTime::now(),
383 direction: MessageDirection::Response,
384 transport: TransportType::Http,
385 headers: Some(response_header_map.clone()),
386 };
387
388 let _ = message_sender.send(error_message);
389
390 Ok(Box::new(warp::reply::with_status(
392 warp::reply::json(&serde_json::json!({
393 "jsonrpc": "2.0",
394 "id": body.get("id"),
395 "error": {
396 "code": -32700,
397 "message": format!("Invalid JSON response from server (HTTP {})", status),
398 "data": {
399 "issue_type": issue_type,
400 "content_type": content_type,
401 "has_null_bytes": has_null_bytes
402 }
403 }
404 })),
405 warp::http::StatusCode::OK, )))
407 }
408 }
409 }
410 Err(_e) => {
411 let error_message = JsonRpcMessage {
413 id: body.get("id").cloned(),
414 method: None,
415 params: None,
416 result: None,
417 error: Some(serde_json::json!({
418 "code": -32603,
419 "message": "Internal error - failed to read response"
420 })),
421 timestamp: std::time::SystemTime::now(),
422 direction: MessageDirection::Response,
423 transport: TransportType::Http,
424 headers: Some(response_header_map),
425 };
426
427 let _ = message_sender.send(error_message);
428
429 Ok(Box::new(warp::reply::with_status(
430 warp::reply::json(&serde_json::json!({
431 "jsonrpc": "2.0",
432 "id": body.get("id"),
433 "error": {
434 "code": -32603,
435 "message": "Internal error - failed to read response"
436 }
437 })),
438 warp::http::StatusCode::INTERNAL_SERVER_ERROR,
439 )))
440 }
441 }
442 }
443 Err(_e) => {
444 let error_message = JsonRpcMessage {
446 id: body.get("id").cloned(),
447 method: None,
448 params: None,
449 result: None,
450 error: Some(serde_json::json!({
451 "code": -32603,
452 "message": "Failed to connect to target server"
453 })),
454 timestamp: std::time::SystemTime::now(),
455 direction: MessageDirection::Response,
456 transport: TransportType::Http,
457 headers: None,
458 };
459
460 let _ = message_sender.send(error_message);
461
462 Ok(Box::new(warp::reply::with_status(
463 warp::reply::json(&serde_json::json!({
464 "jsonrpc": "2.0",
465 "id": body.get("id"),
466 "error": {
467 "code": -32603,
468 "message": "Failed to connect to target server"
469 }
470 })),
471 warp::http::StatusCode::BAD_GATEWAY,
472 )))
473 }
474 }
475}
476
477fn should_forward_header(header_name: &str) -> bool {
478 !matches!(
479 header_name.to_lowercase().as_str(),
480 "host" | "content-length" | "transfer-encoding" | "connection"
481 )
482}