lynx_core/proxy/
http_proxy.rs1use anyhow::{Error, Result};
2use http_body_util::combinators::BoxBody;
3use hyper::body::{Bytes, Incoming};
4use hyper::{Request, Response};
5use sea_orm::{ActiveModelTrait, Set};
6use tracing::{info, trace};
7
8use crate::entities::app_config::{get_app_config, RecordingStatus};
9use crate::entities::request::{self};
10use crate::entities::response;
11use crate::plugins::http_request_plugin::{self, build_proxy_response};
12use crate::proxy_log::message::Message;
13use crate::proxy_log::try_send_message;
14use crate::schedular::get_req_trace_id;
15use crate::server_context::DB;
16
17pub async fn proxy_http_request(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, Error>>> {
18 info!("proxying http request {:?}", req);
19 let trace_id = get_req_trace_id(&req);
20
21 let headers = req
22 .headers()
23 .iter()
24 .map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
25 .collect();
26 let header_size: usize = req
27 .headers()
28 .iter()
29 .map(|(k, v)| k.as_str().len() + v.as_bytes().len())
30 .sum();
31
32 let mut request_active_model = request::ActiveModel {
33 trace_id: Set(trace_id.to_string()),
34 uri: Set(req.uri().to_string()),
35 method: Set(req.method().to_string()),
36 schema: Set(req.uri().scheme_str().unwrap_or("").to_string()),
37 version: Set(format!("{:?}", req.version())),
38 header: Set(Some(headers)),
39 header_size: Set(Some(header_size as u32)),
40 ..Default::default()
41 };
42
43 match http_request_plugin::request(req).await {
44 Ok(proxy_res) => {
45 trace!("origin response: {:?}", proxy_res);
46 request_active_model.set(
47 request::Column::StatusCode,
48 proxy_res.status().as_u16().into(),
49 );
50 let app_config = get_app_config().await;
51 trace!("recording status: {:?}", app_config.recording_status);
52 if matches!(app_config.recording_status, RecordingStatus::StartRecording) {
53 let record = request_active_model.insert(DB.get().unwrap()).await?;
54 let request_id = record.id;
55 try_send_message(Message::add(record));
56 let header_size: usize = proxy_res
57 .headers()
58 .iter()
59 .map(|(k, v)| k.as_str().len() + v.as_bytes().len())
60 .sum();
61
62 let response = response::ActiveModel {
63 request_id: Set(request_id),
64 trace_id: Set(trace_id.to_string()),
65 header: Set(proxy_res
66 .headers()
67 .iter()
68 .map(|(k, v)| {
69 (k.as_str().to_string(), v.to_str().unwrap_or("").to_string())
70 })
71 .collect()),
72 header_size: Set(header_size as u32),
73 ..Default::default()
74 };
75
76 response.insert(DB.get().unwrap()).await?;
77 }
78
79 build_proxy_response(trace_id, proxy_res).await
80 }
81 Err(e) => {
82 trace!("proxy http request error: {:?}", e);
83 let app_config = get_app_config().await;
84
85 if matches!(app_config.recording_status, RecordingStatus::StartRecording) {
86 let record = request_active_model.insert(DB.get().unwrap()).await?;
87 try_send_message(Message::add(record));
88 }
89 Err(e)
90 }
91 }
92}