1use crate::app::{
2 AppMode, JsonRpcMessage, MessageDirection, PendingRequest, ProxyDecision, TransportType,
3};
4use anyhow::Result;
5use reqwest::Client;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use uuid::Uuid;
10
11use tokio::sync::{mpsc, oneshot};
12use warp::Filter;
13
14#[derive(Clone)]
16pub struct ProxyState {
17 pub app_mode: Arc<Mutex<AppMode>>,
18 pub pending_sender: mpsc::UnboundedSender<PendingRequest>,
19}
20
21pub struct ProxyServer {
22 listen_port: u16,
23 target_url: String,
24 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
25 client: Client,
26 proxy_state: Option<ProxyState>,
27}
28
29impl ProxyServer {
30 pub fn new(
31 listen_port: u16,
32 target_url: String,
33 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
34 ) -> Self {
35 Self {
36 listen_port,
37 target_url,
38 message_sender,
39 client: Client::new(),
40 proxy_state: None,
41 }
42 }
43
44 pub fn with_state(mut self, proxy_state: ProxyState) -> Self {
45 self.proxy_state = Some(proxy_state);
46 self
47 }
48
49 pub async fn start(&self) -> Result<()> {
50 let target_url = self.target_url.clone();
51 let client = self.client.clone();
52 let message_sender = self.message_sender.clone();
53 let proxy_state = self.proxy_state.clone();
54
55 let proxy_route = warp::path::end()
56 .and(warp::post())
57 .and(warp::header::headers_cloned())
58 .and(warp::body::json())
59 .and_then(move |headers: warp::http::HeaderMap, body: Value| {
60 let target_url = target_url.clone();
61 let client = client.clone();
62 let message_sender = message_sender.clone();
63 let proxy_state = proxy_state.clone();
64
65 async move {
66 handle_proxy_request(
67 headers,
68 body,
69 target_url,
70 client,
71 message_sender,
72 proxy_state,
73 )
74 .await
75 }
76 });
77
78 let cors = warp::cors()
79 .allow_any_origin()
80 .allow_headers(vec!["content-type", "authorization"])
81 .allow_methods(vec!["POST", "OPTIONS"]);
82
83 let routes = proxy_route.with(cors);
84
85 warp::serve(routes)
88 .run(([127, 0, 0, 1], self.listen_port))
89 .await;
90
91 Ok(())
92 }
93}
94
95async fn handle_proxy_request(
96 headers: warp::http::HeaderMap,
97 body: Value,
98 target_url: String,
99 client: Client,
100 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
101 proxy_state: Option<ProxyState>,
102) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
103 let mut header_map = HashMap::new();
105 for (name, value) in headers.iter() {
106 if let Ok(value_str) = value.to_str() {
107 header_map.insert(name.to_string(), value_str.to_string());
108 }
109 }
110
111 let request_message = JsonRpcMessage {
113 id: body.get("id").cloned(),
114 method: body
115 .get("method")
116 .and_then(|m| m.as_str())
117 .map(String::from),
118 params: body.get("params").cloned(),
119 result: None,
120 error: None,
121 timestamp: std::time::SystemTime::now(),
122 direction: MessageDirection::Request,
123 transport: TransportType::Http,
124 headers: Some(header_map.clone()),
125 };
126
127 let _ = message_sender.send(request_message.clone());
128
129 if let Some(ref state) = proxy_state {
131 let should_intercept = if let Ok(app_mode) = state.app_mode.lock() {
132 matches!(*app_mode, AppMode::Paused)
133 } else {
134 false
135 };
136
137 if should_intercept {
138 let (decision_sender, decision_receiver) = oneshot::channel();
140
141 let pending_request = PendingRequest {
143 id: Uuid::new_v4().to_string(),
144 original_request: request_message,
145 modified_request: None,
146 modified_headers: None,
147 decision_sender,
148 };
149
150 let _ = state.pending_sender.send(pending_request);
152
153 let decision = tokio::time::timeout(
155 std::time::Duration::from_secs(300), decision_receiver,
157 )
158 .await;
159
160 return match decision {
161 Ok(Ok(ProxyDecision::Allow(modified_json, modified_headers))) => {
162 let request_body = modified_json.unwrap_or(body);
164
165 let final_headers = if let Some(mod_headers) = modified_headers {
167 let mut header_map = warp::http::HeaderMap::new();
169 for (key, value) in mod_headers {
170 if let (Ok(header_name), Ok(header_value)) = (
171 warp::http::header::HeaderName::from_bytes(key.as_bytes()),
172 warp::http::header::HeaderValue::from_str(&value),
173 ) {
174 header_map.insert(header_name, header_value);
175 }
176 }
177 header_map
178 } else {
179 headers
180 };
181
182 forward_request(
183 final_headers,
184 request_body,
185 target_url,
186 client,
187 message_sender,
188 )
189 .await
190 }
191 Ok(Ok(ProxyDecision::Block)) => {
192 Ok(Box::new(warp::reply::with_status(
194 warp::reply::json(&serde_json::json!({
195 "jsonrpc": "2.0",
196 "id": body.get("id"),
197 "error": {
198 "code": -32603,
199 "message": "Request blocked by user"
200 }
201 })),
202 warp::http::StatusCode::OK,
203 )))
204 }
205 Ok(Ok(ProxyDecision::Complete(response_json))) => {
206 let response_message = JsonRpcMessage {
208 id: response_json.get("id").cloned(),
209 method: None,
210 params: None,
211 result: response_json.get("result").cloned(),
212 error: response_json.get("error").cloned(),
213 timestamp: std::time::SystemTime::now(),
214 direction: MessageDirection::Response,
215 transport: TransportType::Http,
216 headers: Some(HashMap::from([
217 ("content-type".to_string(), "application/json".to_string()),
218 ("x-proxy-completed".to_string(), "true".to_string()),
219 ])),
220 };
221
222 let _ = message_sender.send(response_message);
223
224 Ok(Box::new(warp::reply::with_status(
226 warp::reply::json(&response_json),
227 warp::http::StatusCode::OK,
228 )))
229 }
230 Ok(Err(_)) | Err(_) => {
231 Ok(Box::new(warp::reply::with_status(
233 warp::reply::json(&serde_json::json!({
234 "jsonrpc": "2.0",
235 "id": body.get("id"),
236 "error": {
237 "code": -32603,
238 "message": "Request timed out waiting for user decision"
239 }
240 })),
241 warp::http::StatusCode::REQUEST_TIMEOUT,
242 )))
243 }
244 };
245 }
246 }
247
248 forward_request(headers, body, target_url, client, message_sender).await
250}
251
252async fn forward_request(
253 headers: warp::http::HeaderMap,
254 body: Value,
255 target_url: String,
256 client: Client,
257 message_sender: mpsc::UnboundedSender<JsonRpcMessage>,
258) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
259 let mut request_builder = client.post(&target_url).json(&body);
261
262 for (name, value) in headers.iter() {
264 if should_forward_header(name.as_str()) {
265 request_builder = request_builder.header(name, value);
266 }
267 }
268
269 match request_builder.send().await {
270 Ok(response) => {
271 let status = response.status();
272 let response_headers = response.headers().clone();
273
274 let mut response_header_map = HashMap::new();
276 for (name, value) in response_headers.iter() {
277 if let Ok(value_str) = value.to_str() {
278 response_header_map.insert(name.to_string(), value_str.to_string());
279 }
280 }
281
282 match response.text().await {
284 Ok(response_text) => {
285 match serde_json::from_str::<Value>(&response_text) {
287 Ok(response_body) => {
288 let response_message = JsonRpcMessage {
290 id: response_body.get("id").cloned(),
291 method: None,
292 params: None,
293 result: response_body.get("result").cloned(),
294 error: response_body.get("error").cloned(),
295 timestamp: std::time::SystemTime::now(),
296 direction: MessageDirection::Response,
297 transport: TransportType::Http,
298 headers: Some(response_header_map.clone()),
299 };
300
301 let _ = message_sender.send(response_message);
302
303 Ok(Box::new(warp::reply::with_status(
305 warp::reply::json(&response_body),
306 status,
307 )))
308 }
309 Err(parse_error) => {
310 let content_type = response_header_map
312 .get("content-type")
313 .unwrap_or(&"unknown".to_string())
314 .clone();
315
316 let has_null_bytes = response_text.contains('\0');
318 let is_empty = response_text.trim().is_empty();
319
320 let content_preview = if has_null_bytes {
322 let bytes: Vec<u8> = response_text.bytes().take(50).collect();
324 format!("Binary data: {:02x?}...", bytes)
325 } else if response_text.trim().starts_with('{')
326 || response_text.trim().starts_with('[')
327 {
328 if response_text.len() > 500 {
330 format!("{}...", &response_text[..500])
331 } else {
332 response_text.clone()
333 }
334 } else if response_text.len() > 200 {
335 format!("{}...", &response_text[..200])
336 } else {
337 response_text.clone()
338 };
339
340 let issue_type = if is_empty {
342 "empty_response"
343 } else if has_null_bytes {
344 "binary_data"
345 } else if content_type.contains("text/html") {
346 "html_response"
347 } else if content_type.contains("application/json") {
348 "malformed_json"
349 } else {
350 "unknown_format"
351 };
352
353 let error_message = JsonRpcMessage {
354 id: body.get("id").cloned(),
355 method: None,
356 params: None,
357 result: None,
358 error: Some(serde_json::json!({
359 "code": -32700,
360 "message": format!("Invalid JSON response from server (HTTP {})", status),
361 "data": {
362 "issue_type": issue_type,
363 "content_type": content_type,
364 "response_preview": content_preview,
365 "response_length": response_text.len(),
366 "has_null_bytes": has_null_bytes,
367 "parse_error": parse_error.to_string(),
368 "target_url": target_url
369 }
370 })),
371 timestamp: std::time::SystemTime::now(),
372 direction: MessageDirection::Response,
373 transport: TransportType::Http,
374 headers: Some(response_header_map.clone()),
375 };
376
377 let _ = message_sender.send(error_message);
378
379 Ok(Box::new(warp::reply::with_status(
381 warp::reply::json(&serde_json::json!({
382 "jsonrpc": "2.0",
383 "id": body.get("id"),
384 "error": {
385 "code": -32700,
386 "message": format!("Invalid JSON response from server (HTTP {})", status),
387 "data": {
388 "issue_type": issue_type,
389 "content_type": content_type,
390 "has_null_bytes": has_null_bytes
391 }
392 }
393 })),
394 warp::http::StatusCode::OK, )))
396 }
397 }
398 }
399 Err(_e) => {
400 let error_message = JsonRpcMessage {
402 id: body.get("id").cloned(),
403 method: None,
404 params: None,
405 result: None,
406 error: Some(serde_json::json!({
407 "code": -32603,
408 "message": "Internal error - failed to read response"
409 })),
410 timestamp: std::time::SystemTime::now(),
411 direction: MessageDirection::Response,
412 transport: TransportType::Http,
413 headers: Some(response_header_map),
414 };
415
416 let _ = message_sender.send(error_message);
417
418 Ok(Box::new(warp::reply::with_status(
419 warp::reply::json(&serde_json::json!({
420 "jsonrpc": "2.0",
421 "id": body.get("id"),
422 "error": {
423 "code": -32603,
424 "message": "Internal error - failed to read response"
425 }
426 })),
427 warp::http::StatusCode::INTERNAL_SERVER_ERROR,
428 )))
429 }
430 }
431 }
432 Err(_e) => {
433 let error_message = JsonRpcMessage {
435 id: body.get("id").cloned(),
436 method: None,
437 params: None,
438 result: None,
439 error: Some(serde_json::json!({
440 "code": -32603,
441 "message": "Failed to connect to target server"
442 })),
443 timestamp: std::time::SystemTime::now(),
444 direction: MessageDirection::Response,
445 transport: TransportType::Http,
446 headers: None,
447 };
448
449 let _ = message_sender.send(error_message);
450
451 Ok(Box::new(warp::reply::with_status(
452 warp::reply::json(&serde_json::json!({
453 "jsonrpc": "2.0",
454 "id": body.get("id"),
455 "error": {
456 "code": -32603,
457 "message": "Failed to connect to target server"
458 }
459 })),
460 warp::http::StatusCode::BAD_GATEWAY,
461 )))
462 }
463 }
464}
465
466fn should_forward_header(header_name: &str) -> bool {
467 !matches!(
468 header_name.to_lowercase().as_str(),
469 "host" | "content-length" | "transfer-encoding" | "connection"
470 )
471}