1pub mod resp;
2pub mod mysql;
3pub mod amqp;
4pub mod postgres;
5pub mod mongodb;
6pub mod http;
7pub mod memcached;
8pub mod kafka;
9pub mod handler;
10pub mod handlers;
11
12pub use resp::{RespValue, parse_resp};
13pub use mysql::{parse_mysql_request, parse_mysql_response};
14pub use amqp::{parse_amqp_request, parse_amqp_response, format_amqp_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, frame_len as amqp_frame_len};
15pub use handler::ProtocolHandler;
16pub use handlers::*;
17
18use std::time::{Duration, SystemTime};
19
20#[derive(Debug, Clone)]
22pub struct ProxyEvent {
23 pub timestamp: SystemTime,
24 pub component: String,
25 pub protocol: Protocol,
26 pub command: String,
28 pub full_command: String,
30 pub response: String,
32 pub response_detail: String,
34 pub latency: Duration,
36 pub process: Option<String>,
38 pub src: Option<String>,
40 pub dest: Option<String>,
42 pub system: bool,
44}
45
46impl ProxyEvent {
47 pub fn system_event(component: &str, message: String) -> Self {
49 Self {
50 timestamp: SystemTime::now(),
51 component: component.to_string(),
52 protocol: Protocol::Redis, command: message.clone(),
54 full_command: message.clone(),
55 response: String::new(),
56 response_detail: message,
57 latency: Duration::ZERO,
58 process: None,
59 src: None,
60 dest: None,
61 system: true,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum Direction {
68 Request,
69 Response,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum Protocol {
74 Redis,
75 Mysql,
76 Amqp,
77 Postgres,
78 Mongodb,
79 Http,
80 Memcached,
81 Kafka,
82}
83
84impl Protocol {
85 pub fn parse(s: &str) -> Option<Self> {
86 match s.to_lowercase().as_str() {
87 "redis" => Some(Protocol::Redis),
88 "mysql" => Some(Protocol::Mysql),
89 "amqp" | "rabbitmq" => Some(Protocol::Amqp),
90 "postgres" | "postgresql" => Some(Protocol::Postgres),
91 "mongodb" | "mongo" => Some(Protocol::Mongodb),
92 "http" | "elasticsearch" | "es" => Some(Protocol::Http),
93 "memcached" | "memcache" => Some(Protocol::Memcached),
94 "kafka" => Some(Protocol::Kafka),
95 _ => None,
96 }
97 }
98}
99
100pub fn parse_request(protocol: Protocol, buf: &[u8]) -> Option<String> {
102 get_handler(protocol).parse_request(buf)
103}
104
105pub fn extract_full_command(protocol: Protocol, buf: &[u8]) -> Option<String> {
107 get_handler(protocol).extract_full_command(buf)
108}
109
110pub fn parse_response(protocol: Protocol, buf: &[u8]) -> Option<String> {
112 get_handler(protocol).parse_response(buf)
113}
114
115pub fn format_response_detail(protocol: Protocol, buf: &[u8]) -> Option<String> {
117 get_handler(protocol).format_response_detail(buf)
118}
119
120pub fn get_handler(protocol: Protocol) -> &'static dyn ProtocolHandler {
122 match protocol {
123 Protocol::Redis => &RedisHandler,
124 Protocol::Mysql => &MysqlHandler,
125 Protocol::Amqp => &AmqpHandler,
126 Protocol::Postgres => &PostgresHandler,
127 Protocol::Mongodb => &MongodbHandler,
128 Protocol::Http => &HttpHandler,
129 Protocol::Memcached => &MemcachedHandler,
130 Protocol::Kafka => &KafkaHandler,
131 }
132}