feophantlib/processor/
client_processor.rs1use bytes::Bytes;
2use thiserror::Error;
3
4use super::super::engine::transactions::{TransactionManager, TransactionManagerError};
5use super::super::engine::{Engine, EngineError};
6use super::ssl_and_gssapi_parser;
7use super::startup_parser;
8use crate::codec::{NetworkFrame, NetworkFrameError};
9use crate::constants::{PgErrorCodes, PgErrorLevels};
10
11pub struct ClientProcessor {
12 engine: Engine,
13 transaction_manager: TransactionManager,
14}
15
16impl ClientProcessor {
17 pub fn new(engine: Engine, transaction_manager: TransactionManager) -> ClientProcessor {
18 ClientProcessor {
19 engine,
20 transaction_manager,
21 }
22 }
23
24 pub async fn process(
25 &mut self,
26 frame: NetworkFrame,
27 ) -> Result<Vec<NetworkFrame>, ClientProcessorError> {
28 let payload_buff: &[u8] = &frame.payload;
29
30 if frame.message_type == 0 && ssl_and_gssapi_parser::is_ssl_request(payload_buff) {
32 debug!("Got a SSL Request, no security here... yet");
33 return Ok(vec![NetworkFrame::new(0, Bytes::from_static(b"N"))]);
34 } else if frame.message_type == 0 && ssl_and_gssapi_parser::is_gssapi_request(payload_buff)
35 {
36 debug!("Got a GSSAPI Request, no security here... yet");
37 return Ok(vec![NetworkFrame::new(0, Bytes::from_static(b"N"))]);
38 } else if frame.message_type == 0 {
39 debug!("Got a startup message!");
40 let message = startup_parser::parse_startup(payload_buff)
41 .map_err(|_| ClientProcessorError::BadStartup())?;
42
43 info!("Just going to let {:?} in", message.get("user"));
47 return Ok(vec![
48 NetworkFrame::authentication_ok(),
49 NetworkFrame::ready_for_query(),
50 ]);
51 }
52
53 if frame.message_type == b'Q' {
55 debug!("Got query {:?}", payload_buff);
56
57 let result = match self.process_single_query(payload_buff).await {
58 Ok(o) => o,
59 Err(e) => {
60 return Ok(vec![
61 NetworkFrame::error_response(
62 PgErrorLevels::Error,
63 PgErrorCodes::SystemError,
64 e.to_string(),
65 ),
66 NetworkFrame::ready_for_query(),
67 ]);
68 }
69 };
70
71 return Ok(result);
72 }
73
74 warn!(
75 "Got a message we don't understand yet {}",
76 frame.message_type
77 );
78 Ok(vec![NetworkFrame::error_response(
79 PgErrorLevels::Error,
80 PgErrorCodes::SystemError,
81 "Got an unimplemented message".to_string(),
82 )])
83 }
84
85 async fn process_single_query(
86 &mut self,
87 payload_buff: &[u8],
88 ) -> Result<Vec<NetworkFrame>, ClientProcessorError> {
89 let query_str = String::from_utf8(payload_buff.to_vec())?;
91
92 let txid = self.transaction_manager.start_trans().await?;
93
94 let query_res = match self.engine.process_query(txid, query_str).await {
95 Ok(o) => {
96 self.transaction_manager.commit_trans(txid).await?;
97 o
98 }
99 Err(e) => {
100 self.transaction_manager.abort_trans(txid).await?;
101 return Err(ClientProcessorError::EngineError(e));
102 }
103 };
104
105 let mut frames = vec![];
106 if !query_res.columns.is_empty() {
107 frames.push(NetworkFrame::row_description(query_res.columns)?);
108 }
109
110 let results_rows = query_res.rows.len();
111 if !query_res.rows.is_empty() {
112 frames.append(&mut NetworkFrame::data_rows(query_res.rows)?);
113 }
114
115 frames.push(NetworkFrame::command_complete(format!(
116 "SELECT {}",
117 results_rows
118 )));
119
120 frames.push(NetworkFrame::ready_for_query());
121
122 Ok(frames)
123 }
124}
125
126#[derive(Error, Debug)]
127pub enum ClientProcessorError {
128 #[error("Malformed Startup Packet")]
129 BadStartup(),
130 #[error(transparent)]
131 EngineError(#[from] EngineError),
132 #[error(transparent)]
133 NetworkFrameError(#[from] NetworkFrameError),
134 #[error(transparent)]
135 QueryNotUtf8(#[from] std::string::FromUtf8Error),
136 #[error(transparent)]
137 TransactionManagerError(#[from] TransactionManagerError),
138}