lynx_core/proxy/
http_proxy.rs

1use anyhow::{Error, Result};
2use http_body_util::combinators::BoxBody;
3use hyper::body::{Bytes, Incoming};
4use hyper::{Request, Response};
5use sea_orm::{ActiveModelTrait, Set};
6use tracing::{info, trace};
7
8use crate::entities::app_config::{get_app_config, RecordingStatus};
9use crate::entities::request::{self};
10use crate::entities::response;
11use crate::plugins::http_request_plugin::{self, build_proxy_response};
12use crate::proxy_log::message::Message;
13use crate::proxy_log::try_send_message;
14use crate::schedular::get_req_trace_id;
15use crate::server_context::DB;
16
17pub async fn proxy_http_request(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, Error>>> {
18    info!("proxying http request {:?}", req);
19    let trace_id = get_req_trace_id(&req);
20
21    let headers = req
22        .headers()
23        .iter()
24        .map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
25        .collect();
26    let header_size: usize = req
27        .headers()
28        .iter()
29        .map(|(k, v)| k.as_str().len() + v.as_bytes().len())
30        .sum();
31
32    let mut request_active_model = request::ActiveModel {
33        trace_id: Set(trace_id.to_string()),
34        uri: Set(req.uri().to_string()),
35        method: Set(req.method().to_string()),
36        schema: Set(req.uri().scheme_str().unwrap_or("").to_string()),
37        version: Set(format!("{:?}", req.version())),
38        header: Set(Some(headers)),
39        header_size: Set(Some(header_size as u32)),
40        ..Default::default()
41    };
42
43    match http_request_plugin::request(req).await {
44        Ok(proxy_res) => {
45            trace!("origin response: {:?}", proxy_res);
46            request_active_model.set(
47                request::Column::StatusCode,
48                proxy_res.status().as_u16().into(),
49            );
50            let app_config = get_app_config().await;
51            trace!("recording status: {:?}", app_config.recording_status);
52            if matches!(app_config.recording_status, RecordingStatus::StartRecording) {
53                let record = request_active_model.insert(DB.get().unwrap()).await?;
54                let request_id = record.id;
55                try_send_message(Message::add(record));
56                let header_size: usize = proxy_res
57                    .headers()
58                    .iter()
59                    .map(|(k, v)| k.as_str().len() + v.as_bytes().len())
60                    .sum();
61
62                let response = response::ActiveModel {
63                    request_id: Set(request_id),
64                    trace_id: Set(trace_id.to_string()),
65                    header: Set(proxy_res
66                        .headers()
67                        .iter()
68                        .map(|(k, v)| {
69                            (k.as_str().to_string(), v.to_str().unwrap_or("").to_string())
70                        })
71                        .collect()),
72                    header_size: Set(header_size as u32),
73                    ..Default::default()
74                };
75
76                response.insert(DB.get().unwrap()).await?;
77            }
78
79            build_proxy_response(trace_id, proxy_res).await
80        }
81        Err(e) => {
82            trace!("proxy http request error: {:?}", e);
83            let app_config = get_app_config().await;
84
85            if matches!(app_config.recording_status, RecordingStatus::StartRecording) {
86                let record = request_active_model.insert(DB.get().unwrap()).await?;
87                try_send_message(Message::add(record));
88            }
89            Err(e)
90        }
91    }
92}