feophantlib/processor/
client_processor.rs

1use bytes::Bytes;
2use thiserror::Error;
3
4use super::super::engine::transactions::{TransactionManager, TransactionManagerError};
5use super::super::engine::{Engine, EngineError};
6use super::ssl_and_gssapi_parser;
7use super::startup_parser;
8use crate::codec::{NetworkFrame, NetworkFrameError};
9use crate::constants::{PgErrorCodes, PgErrorLevels};
10
11pub struct ClientProcessor {
12    engine: Engine,
13    transaction_manager: TransactionManager,
14}
15
16impl ClientProcessor {
17    pub fn new(engine: Engine, transaction_manager: TransactionManager) -> ClientProcessor {
18        ClientProcessor {
19            engine,
20            transaction_manager,
21        }
22    }
23
24    pub async fn process(
25        &mut self,
26        frame: NetworkFrame,
27    ) -> Result<Vec<NetworkFrame>, ClientProcessorError> {
28        let payload_buff: &[u8] = &frame.payload;
29
30        //Startup stuff
31        if frame.message_type == 0 && ssl_and_gssapi_parser::is_ssl_request(payload_buff) {
32            debug!("Got a SSL Request, no security here... yet");
33            return Ok(vec![NetworkFrame::new(0, Bytes::from_static(b"N"))]);
34        } else if frame.message_type == 0 && ssl_and_gssapi_parser::is_gssapi_request(payload_buff)
35        {
36            debug!("Got a GSSAPI Request, no security here... yet");
37            return Ok(vec![NetworkFrame::new(0, Bytes::from_static(b"N"))]);
38        } else if frame.message_type == 0 {
39            debug!("Got a startup message!");
40            let message = startup_parser::parse_startup(payload_buff)
41                .map_err(|_| ClientProcessorError::BadStartup())?;
42
43            //TODO: Upon getting a startup message we should be checking for a database and user
44            //We should also check for configured authentication methods... maybe later!
45            //   we're just going to let them in so we can get further on message parsing.
46            info!("Just going to let {:?} in", message.get("user"));
47            return Ok(vec![
48                NetworkFrame::authentication_ok(),
49                NetworkFrame::ready_for_query(),
50            ]);
51        }
52
53        //Support basic query
54        if frame.message_type == b'Q' {
55            debug!("Got query {:?}", payload_buff);
56
57            let result = match self.process_single_query(payload_buff).await {
58                Ok(o) => o,
59                Err(e) => {
60                    return Ok(vec![
61                        NetworkFrame::error_response(
62                            PgErrorLevels::Error,
63                            PgErrorCodes::SystemError,
64                            e.to_string(),
65                        ),
66                        NetworkFrame::ready_for_query(),
67                    ]);
68                }
69            };
70
71            return Ok(result);
72        }
73
74        warn!(
75            "Got a message we don't understand yet {}",
76            frame.message_type
77        );
78        Ok(vec![NetworkFrame::error_response(
79            PgErrorLevels::Error,
80            PgErrorCodes::SystemError,
81            "Got an unimplemented message".to_string(),
82        )])
83    }
84
85    async fn process_single_query(
86        &mut self,
87        payload_buff: &[u8],
88    ) -> Result<Vec<NetworkFrame>, ClientProcessorError> {
89        //Convert to utf8
90        let query_str = String::from_utf8(payload_buff.to_vec())?;
91
92        let txid = self.transaction_manager.start_trans().await?;
93
94        let query_res = match self.engine.process_query(txid, query_str).await {
95            Ok(o) => {
96                self.transaction_manager.commit_trans(txid).await?;
97                o
98            }
99            Err(e) => {
100                self.transaction_manager.abort_trans(txid).await?;
101                return Err(ClientProcessorError::EngineError(e));
102            }
103        };
104
105        let mut frames = vec![];
106        if !query_res.columns.is_empty() {
107            frames.push(NetworkFrame::row_description(query_res.columns)?);
108        }
109
110        let results_rows = query_res.rows.len();
111        if !query_res.rows.is_empty() {
112            frames.append(&mut NetworkFrame::data_rows(query_res.rows)?);
113        }
114
115        frames.push(NetworkFrame::command_complete(format!(
116            "SELECT {}",
117            results_rows
118        )));
119
120        frames.push(NetworkFrame::ready_for_query());
121
122        Ok(frames)
123    }
124}
125
126#[derive(Error, Debug)]
127pub enum ClientProcessorError {
128    #[error("Malformed Startup Packet")]
129    BadStartup(),
130    #[error(transparent)]
131    EngineError(#[from] EngineError),
132    #[error(transparent)]
133    NetworkFrameError(#[from] NetworkFrameError),
134    #[error(transparent)]
135    QueryNotUtf8(#[from] std::string::FromUtf8Error),
136    #[error(transparent)]
137    TransactionManagerError(#[from] TransactionManagerError),
138}