Skip to main content

ocular_protocol/
handlers.rs

1use crate::handler::{HandshakeAction, ProtocolHandler};
2use crate::{Direction, ProxyEvent};
3
4// ─── Redis ──────────────────────────────────────────────────────────────────
5
6pub struct RedisHandler;
7
8impl ProtocolHandler for RedisHandler {
9    fn parse_request(&self, buf: &[u8]) -> Option<String> {
10        crate::parse_resp(buf).ok().flatten().map(|(val, _)| val.to_command_string())
11    }
12    fn parse_response(&self, buf: &[u8]) -> Option<String> {
13        crate::parse_resp(buf).ok().flatten().map(|(val, _)| val.to_command_string())
14    }
15    fn default_port(&self) -> u16 { 6379 }
16}
17
18// ─── MySQL ──────────────────────────────────────────────────────────────────
19
20const MYSQL_HEADER_LEN: usize = 4; // 3-byte length + 1-byte sequence
21const MYSQL_COM_QUERY: u8 = 0x03;
22const MYSQL_COM_STMT_PREPARE: u8 = 0x16;
23const MYSQL_GREETING_PROTOCOL_VERSION: u8 = 10;
24const MYSQL_OK_MARKER: u8 = 0x00;
25
26pub struct MysqlHandler;
27
28impl ProtocolHandler for MysqlHandler {
29    fn parse_request(&self, buf: &[u8]) -> Option<String> {
30        crate::parse_mysql_request(buf).map(|pkt| pkt.to_summary())
31    }
32    fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
33        if buf.len() < 5 { return None; }
34        let payload_len = (buf[0] as usize) | (buf[1] as usize) << 8 | (buf[2] as usize) << 16;
35        if buf.len() < MYSQL_HEADER_LEN + payload_len || payload_len <= 1 { return None; }
36        let cmd = buf[4];
37        if cmd == MYSQL_COM_QUERY || cmd == MYSQL_COM_STMT_PREPARE {
38            let sql = String::from_utf8_lossy(&buf[5..MYSQL_HEADER_LEN + payload_len]);
39            Some(sql.replace(|c: char| c.is_control(), ""))
40        } else {
41            self.parse_request(buf)
42        }
43    }
44    fn parse_response(&self, buf: &[u8]) -> Option<String> {
45        crate::parse_mysql_response(buf).map(|r| r.to_summary())
46    }
47    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
48        crate::parse_mysql_response(buf).map(|r| r.to_display())
49    }
50    fn needs_response_buffering(&self) -> bool { true }
51    fn response_complete(&self, buf: &[u8]) -> bool {
52        crate::mysql::mysql_response_complete(buf)
53    }
54    fn message_length(&self, buf: &[u8]) -> Option<usize> {
55        if buf.len() < MYSQL_HEADER_LEN { return None; }
56        let pkt_len = (buf[0] as usize) | (buf[1] as usize) << 8 | (buf[2] as usize) << 16;
57        Some(MYSQL_HEADER_LEN + pkt_len)
58    }
59    fn capture_handshake(&self, payload: &[u8], direction: Direction) -> HandshakeAction {
60        if payload.len() < 5 { return HandshakeAction::Skip; }
61        let seq = payload[3];
62        let marker = payload[4];
63        match direction {
64            Direction::Request if seq == 0 => HandshakeAction::Done, // real command
65            Direction::Response if seq == 0 && marker == MYSQL_GREETING_PROTOCOL_VERSION => HandshakeAction::Skip,
66            Direction::Response if marker == MYSQL_OK_MARKER => HandshakeAction::Complete,
67            _ => HandshakeAction::Skip, // auth exchange
68        }
69    }
70    fn default_port(&self) -> u16 { 3306 }
71}
72
73// ─── PostgreSQL ─────────────────────────────────────────────────────────────
74
75pub struct PostgresHandler;
76
77impl ProtocolHandler for PostgresHandler {
78    fn parse_request(&self, buf: &[u8]) -> Option<String> {
79        crate::postgres::parse_postgres_request(buf)
80    }
81    fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
82        crate::postgres::extract_postgres_full_command(buf)
83    }
84    fn parse_response(&self, buf: &[u8]) -> Option<String> {
85        crate::postgres::parse_postgres_response(buf)
86    }
87    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
88        crate::postgres::format_postgres_response_detail(buf)
89    }
90    fn needs_response_buffering(&self) -> bool { true }
91    fn response_complete(&self, buf: &[u8]) -> bool {
92        crate::postgres::postgres_response_complete(buf)
93    }
94    fn default_port(&self) -> u16 { 5432 }
95}
96
97// ─── AMQP ───────────────────────────────────────────────────────────────────
98
99pub struct AmqpHandler;
100
101impl ProtocolHandler for AmqpHandler {
102    fn parse_request(&self, buf: &[u8]) -> Option<String> {
103        crate::amqp::parse_amqp_request(buf)
104    }
105    fn parse_response(&self, buf: &[u8]) -> Option<String> {
106        crate::amqp::parse_amqp_response(buf)
107    }
108    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
109        crate::amqp::format_amqp_response_detail(buf)
110    }
111    fn is_frame_based(&self) -> bool { true }
112    fn default_port(&self) -> u16 { 5672 }
113}
114
115// ─── MongoDB ────────────────────────────────────────────────────────────────
116
117pub struct MongodbHandler;
118
119impl ProtocolHandler for MongodbHandler {
120    fn parse_request(&self, buf: &[u8]) -> Option<String> {
121        crate::mongodb::parse_mongo_request(buf)
122    }
123    fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
124        crate::mongodb::extract_mongo_full_command(buf)
125    }
126    fn parse_response(&self, buf: &[u8]) -> Option<String> {
127        crate::mongodb::parse_mongo_response(buf)
128    }
129    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
130        crate::mongodb::format_mongo_response_detail(buf)
131    }
132    fn message_length(&self, buf: &[u8]) -> Option<usize> {
133        if buf.len() < 4 { return None; }
134        let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
135        if len > 0 { Some(len) } else { None }
136    }
137    fn default_port(&self) -> u16 { 27017 }
138}
139
140// ─── HTTP ───────────────────────────────────────────────────────────────────
141
142pub struct HttpHandler;
143
144
145impl ProtocolHandler for HttpHandler {
146    fn parse_request(&self, buf: &[u8]) -> Option<String> {
147        crate::http::parse_http_request(buf)
148    }
149    fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
150        crate::http::extract_http_full_command(buf)
151    }
152    fn parse_response(&self, buf: &[u8]) -> Option<String> {
153        crate::http::parse_http_response(buf)
154    }
155    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
156        crate::http::format_http_response_detail(buf)
157    }
158    fn to_replay_command(&self, ev: &ProxyEvent) -> String {
159        let dest = ev.dest.as_deref().unwrap_or("localhost");
160        let lines: Vec<&str> = ev.full_command.lines().collect();
161        let first_line = lines.first().copied().unwrap_or("");
162        let mut parts = first_line.splitn(2, ' ');
163        let method = parts.next().unwrap_or("GET");
164        let path = parts.next().unwrap_or("/");
165        let url = format!("http://{}{}", dest, path);
166        let mut curl = format!("curl -X {} '{}'", method, url);
167
168        let mut in_headers = false;
169        let mut in_body = false;
170        let mut body = String::new();
171        for line in &lines[1..] {
172            if *line == "[Request Headers]" { in_headers = true; in_body = false; continue; }
173            if *line == "[Request Body]" { in_body = true; in_headers = false; continue; }
174            if line.starts_with('[') && line.ends_with(']') { in_headers = false; in_body = false; continue; }
175            if line.is_empty() { continue; }
176            if in_headers { curl.push_str(&format!(" \\\n  -H '{}'", line)); }
177            if in_body { body.push_str(line); }
178        }
179        if !body.is_empty() { curl.push_str(&format!(" \\\n  -d '{}'", body)); }
180        curl
181    }
182    fn needs_request_buffering(&self) -> bool { true }
183    fn needs_response_buffering(&self) -> bool { true }
184    fn request_complete(&self, buf: &[u8]) -> bool {
185        crate::http::http_request_complete(buf)
186    }
187    fn response_complete(&self, buf: &[u8]) -> bool {
188        crate::http::http_response_complete(buf)
189    }
190    fn default_port(&self) -> u16 { 9200 }
191}
192
193// ─── Memcached ──────────────────────────────────────────────────────────────
194
195pub struct MemcachedHandler;
196
197impl ProtocolHandler for MemcachedHandler {
198    fn parse_request(&self, buf: &[u8]) -> Option<String> {
199        crate::memcached::parse_memcached_request(buf)
200    }
201    fn parse_response(&self, buf: &[u8]) -> Option<String> {
202        crate::memcached::parse_memcached_response(buf)
203    }
204    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
205        crate::memcached::format_memcached_response_detail(buf)
206    }
207    fn needs_request_buffering(&self) -> bool { true }
208    fn needs_response_buffering(&self) -> bool { true }
209    fn request_complete(&self, buf: &[u8]) -> bool {
210        crate::memcached::memcached_request_complete(buf)
211    }
212    fn response_complete(&self, buf: &[u8]) -> bool {
213        crate::memcached::memcached_response_complete(buf)
214    }
215    fn default_port(&self) -> u16 { 11211 }
216}
217
218// ─── Kafka ──────────────────────────────────────────────────────────────────
219
220pub struct KafkaHandler;
221
222impl ProtocolHandler for KafkaHandler {
223    fn parse_request(&self, buf: &[u8]) -> Option<String> {
224        crate::kafka::parse_kafka_request(buf)
225    }
226    fn extract_full_command(&self, buf: &[u8]) -> Option<String> {
227        crate::kafka::extract_kafka_full_command(buf)
228    }
229    fn parse_response(&self, buf: &[u8]) -> Option<String> {
230        crate::kafka::parse_kafka_response(buf)
231    }
232    fn format_response_detail(&self, buf: &[u8]) -> Option<String> {
233        crate::kafka::format_kafka_response_detail(buf)
234    }
235    fn needs_request_buffering(&self) -> bool { true }
236    fn needs_response_buffering(&self) -> bool { true }
237    fn request_complete(&self, buf: &[u8]) -> bool {
238        crate::kafka::kafka_frame_complete(buf)
239    }
240    fn response_complete(&self, buf: &[u8]) -> bool {
241        crate::kafka::kafka_frame_complete(buf)
242    }
243    fn default_port(&self) -> u16 { 9092 }
244}