ocular_protocol/
handlers.rs1use crate::handler::{HandshakeAction, ProtocolHandler};
2use crate::{Direction, ProxyEvent};
3
4pub struct RedisHandler;
7
8impl ProtocolHandler for RedisHandler {
9 fn parse_request(&self, buf: &[u8]) -> Option<String> {
10 crate::parse_resp(buf).ok().flatten().map(|(val, _)| val.to_command_string())
11 }
12 fn parse_response(&self, buf: &[u8]) -> Option<String> {
13 crate::parse_resp(buf).ok().flatten().map(|(val, _)| val.to_command_string())
14 }
15 fn default_port(&self) -> u16 { 6379 }
16}
17
18const MYSQL_HEADER_LEN: usize = 4; const MYSQL_COM_QUERY: u8 = 0x03;
22const MYSQL_COM_STMT_PREPARE: u8 = 0x16;
23const MYSQL_GREETING_PROTOCOL_VERSION: u8 = 10;
24const MYSQL_OK_MARKER: u8 = 0x00;
25
26pub struct MysqlHandler;
27
28impl ProtocolHandler for MysqlHandler {
29 fn parse_request(&self, buf: &[u8]) -> Option<String> {
30 crate::parse_mysql_request(buf).map(|pkt| pkt.to_summary())
31 }
32 fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
33 if buf.len() < 5 { return None; }
34 let payload_len = (buf[0] as usize) | (buf[1] as usize) << 8 | (buf[2] as usize) << 16;
35 if buf.len() < MYSQL_HEADER_LEN + payload_len || payload_len <= 1 { return None; }
36 let cmd = buf[4];
37 if cmd == MYSQL_COM_QUERY || cmd == MYSQL_COM_STMT_PREPARE {
38 let sql = String::from_utf8_lossy(&buf[5..MYSQL_HEADER_LEN + payload_len]);
39 Some(sql.replace(|c: char| c.is_control(), ""))
40 } else {
41 self.parse_request(buf)
42 }
43 }
44 fn parse_response(&self, buf: &[u8]) -> Option<String> {
45 crate::parse_mysql_response(buf).map(|r| r.to_summary())
46 }
47 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
48 crate::parse_mysql_response(buf).map(|r| r.to_display())
49 }
50 fn needs_response_buffering(&self) -> bool { true }
51 fn response_complete(&self, buf: &[u8]) -> bool {
52 crate::mysql::mysql_response_complete(buf)
53 }
54 fn message_length(&self, buf: &[u8]) -> Option<usize> {
55 if buf.len() < MYSQL_HEADER_LEN { return None; }
56 let pkt_len = (buf[0] as usize) | (buf[1] as usize) << 8 | (buf[2] as usize) << 16;
57 Some(MYSQL_HEADER_LEN + pkt_len)
58 }
59 fn capture_handshake(&self, payload: &[u8], direction: Direction) -> HandshakeAction {
60 if payload.len() < 5 { return HandshakeAction::Skip; }
61 let seq = payload[3];
62 let marker = payload[4];
63 match direction {
64 Direction::Request if seq == 0 => HandshakeAction::Done, Direction::Response if seq == 0 && marker == MYSQL_GREETING_PROTOCOL_VERSION => HandshakeAction::Skip,
66 Direction::Response if marker == MYSQL_OK_MARKER => HandshakeAction::Complete,
67 _ => HandshakeAction::Skip, }
69 }
70 fn default_port(&self) -> u16 { 3306 }
71}
72
73pub struct PostgresHandler;
76
77impl ProtocolHandler for PostgresHandler {
78 fn parse_request(&self, buf: &[u8]) -> Option<String> {
79 crate::postgres::parse_postgres_request(buf)
80 }
81 fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
82 crate::postgres::extract_postgres_full_command(buf)
83 }
84 fn parse_response(&self, buf: &[u8]) -> Option<String> {
85 crate::postgres::parse_postgres_response(buf)
86 }
87 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
88 crate::postgres::format_postgres_response_detail(buf)
89 }
90 fn needs_response_buffering(&self) -> bool { true }
91 fn response_complete(&self, buf: &[u8]) -> bool {
92 crate::postgres::postgres_response_complete(buf)
93 }
94 fn default_port(&self) -> u16 { 5432 }
95}
96
97pub struct AmqpHandler;
100
101impl ProtocolHandler for AmqpHandler {
102 fn parse_request(&self, buf: &[u8]) -> Option<String> {
103 crate::amqp::parse_amqp_request(buf)
104 }
105 fn parse_response(&self, buf: &[u8]) -> Option<String> {
106 crate::amqp::parse_amqp_response(buf)
107 }
108 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
109 crate::amqp::format_amqp_response_detail(buf)
110 }
111 fn is_frame_based(&self) -> bool { true }
112 fn default_port(&self) -> u16 { 5672 }
113}
114
115pub struct MongodbHandler;
118
119impl ProtocolHandler for MongodbHandler {
120 fn parse_request(&self, buf: &[u8]) -> Option<String> {
121 crate::mongodb::parse_mongo_request(buf)
122 }
123 fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
124 crate::mongodb::extract_mongo_full_command(buf)
125 }
126 fn parse_response(&self, buf: &[u8]) -> Option<String> {
127 crate::mongodb::parse_mongo_response(buf)
128 }
129 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
130 crate::mongodb::format_mongo_response_detail(buf)
131 }
132 fn message_length(&self, buf: &[u8]) -> Option<usize> {
133 if buf.len() < 4 { return None; }
134 let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
135 if len > 0 { Some(len) } else { None }
136 }
137 fn default_port(&self) -> u16 { 27017 }
138}
139
140pub struct HttpHandler;
143
144
145impl ProtocolHandler for HttpHandler {
146 fn parse_request(&self, buf: &[u8]) -> Option<String> {
147 crate::http::parse_http_request(buf)
148 }
149 fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
150 crate::http::extract_http_full_command(buf)
151 }
152 fn parse_response(&self, buf: &[u8]) -> Option<String> {
153 crate::http::parse_http_response(buf)
154 }
155 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
156 crate::http::format_http_response_detail(buf)
157 }
158 fn to_replay_command(&self, ev: &ProxyEvent) -> String {
159 let dest = ev.dest.as_deref().unwrap_or("localhost");
160 let lines: Vec<&str> = ev.full_command.lines().collect();
161 let first_line = lines.first().copied().unwrap_or("");
162 let mut parts = first_line.splitn(2, ' ');
163 let method = parts.next().unwrap_or("GET");
164 let path = parts.next().unwrap_or("/");
165 let url = format!("http://{}{}", dest, path);
166 let mut curl = format!("curl -X {} '{}'", method, url);
167
168 let mut in_headers = false;
169 let mut in_body = false;
170 let mut body = String::new();
171 for line in &lines[1..] {
172 if *line == "[Request Headers]" { in_headers = true; in_body = false; continue; }
173 if *line == "[Request Body]" { in_body = true; in_headers = false; continue; }
174 if line.starts_with('[') && line.ends_with(']') { in_headers = false; in_body = false; continue; }
175 if line.is_empty() { continue; }
176 if in_headers { curl.push_str(&format!(" \\\n -H '{}'", line)); }
177 if in_body { body.push_str(line); }
178 }
179 if !body.is_empty() { curl.push_str(&format!(" \\\n -d '{}'", body)); }
180 curl
181 }
182 fn needs_request_buffering(&self) -> bool { true }
183 fn needs_response_buffering(&self) -> bool { true }
184 fn request_complete(&self, buf: &[u8]) -> bool {
185 crate::http::http_request_complete(buf)
186 }
187 fn response_complete(&self, buf: &[u8]) -> bool {
188 crate::http::http_response_complete(buf)
189 }
190 fn default_port(&self) -> u16 { 9200 }
191}
192
193pub struct MemcachedHandler;
196
197impl ProtocolHandler for MemcachedHandler {
198 fn parse_request(&self, buf: &[u8]) -> Option<String> {
199 crate::memcached::parse_memcached_request(buf)
200 }
201 fn parse_response(&self, buf: &[u8]) -> Option<String> {
202 crate::memcached::parse_memcached_response(buf)
203 }
204 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
205 crate::memcached::format_memcached_response_detail(buf)
206 }
207 fn needs_request_buffering(&self) -> bool { true }
208 fn needs_response_buffering(&self) -> bool { true }
209 fn request_complete(&self, buf: &[u8]) -> bool {
210 crate::memcached::memcached_request_complete(buf)
211 }
212 fn response_complete(&self, buf: &[u8]) -> bool {
213 crate::memcached::memcached_response_complete(buf)
214 }
215 fn default_port(&self) -> u16 { 11211 }
216}
217
218pub struct KafkaHandler;
221
222impl ProtocolHandler for KafkaHandler {
223 fn parse_request(&self, buf: &[u8]) -> Option<String> {
224 crate::kafka::parse_kafka_request(buf)
225 }
226 fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
227 crate::kafka::extract_kafka_full_command(buf)
228 }
229 fn parse_response(&self, buf: &[u8]) -> Option<String> {
230 crate::kafka::parse_kafka_response(buf)
231 }
232 fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
233 crate::kafka::format_kafka_response_detail(buf)
234 }
235 fn needs_request_buffering(&self) -> bool { true }
236 fn needs_response_buffering(&self) -> bool { true }
237 fn request_complete(&self, buf: &[u8]) -> bool {
238 crate::kafka::kafka_frame_complete(buf)
239 }
240 fn response_complete(&self, buf: &[u8]) -> bool {
241 crate::kafka::kafka_frame_complete(buf)
242 }
243 fn default_port(&self) -> u16 { 9092 }
244}