1pub mod resp;
2pub mod mysql;
3pub mod amqp;
4pub mod postgres;
5pub mod mongodb;
6pub mod http;
7pub mod memcached;
8pub mod kafka;
9pub mod handler;
10pub mod handlers;
11
12pub use resp::{RespValue, parse_resp};
13pub use mysql::{parse_mysql_request, parse_mysql_response};
14pub use amqp::{parse_amqp_request, parse_amqp_response, format_amqp_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, frame_len as amqp_frame_len};
15pub use handler::ProtocolHandler;
16pub use handler::HandshakeAction;
17pub use handlers::*;
18
19use std::time::{Duration, SystemTime};
20use std::sync::{Arc, Mutex};
21use std::collections::HashMap;
22
23#[derive(Debug, Clone, Default)]
25pub struct ConnectionState {
26 pub active_connections: usize,
27 pub has_connector: bool,
28 pub last_error: Option<String>,
29 pub last_active_at: Option<SystemTime>,
30}
31
32pub type StatusMap = Arc<Mutex<HashMap<String, ConnectionState>>>;
34
35#[derive(Debug, Clone)]
37pub struct ProxyEvent {
38 pub timestamp: SystemTime,
39 pub component: String,
40 pub protocol: Protocol,
41 pub command: String,
43 pub full_command: String,
45 pub response: String,
47 pub response_detail: String,
49 pub latency: Duration,
51 pub process: Option<String>,
53 pub src: Option<String>,
55 pub dest: Option<String>,
57 pub system: bool,
59}
60
61impl ProxyEvent {
62 pub fn system_event(component: &str, message: String) -> Self {
64 Self {
65 timestamp: SystemTime::now(),
66 component: component.to_string(),
67 protocol: Protocol::Redis, command: message.clone(),
69 full_command: message.clone(),
70 response: String::new(),
71 response_detail: message,
72 latency: Duration::ZERO,
73 process: None,
74 src: None,
75 dest: None,
76 system: true,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum Direction {
83 Request,
84 Response,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Protocol {
89 Redis,
90 Mysql,
91 Amqp,
92 Postgres,
93 Mongodb,
94 Http,
95 Memcached,
96 Kafka,
97}
98
99impl Protocol {
100 pub fn parse(s: &str) -> Option<Self> {
101 match s.to_lowercase().as_str() {
102 "redis" => Some(Protocol::Redis),
103 "mysql" => Some(Protocol::Mysql),
104 "amqp" | "rabbitmq" => Some(Protocol::Amqp),
105 "postgres" | "postgresql" => Some(Protocol::Postgres),
106 "mongodb" | "mongo" => Some(Protocol::Mongodb),
107 "http" | "elasticsearch" | "es" => Some(Protocol::Http),
108 "memcached" | "memcache" => Some(Protocol::Memcached),
109 "kafka" => Some(Protocol::Kafka),
110 _ => None,
111 }
112 }
113}
114
115pub fn parse_request(protocol: Protocol, buf: &[u8]) -> Option<String> {
117 get_handler(protocol).parse_request(buf)
118}
119
120pub fn extract_full_command(protocol: Protocol, buf: &[u8]) -> Option<String> {
122 get_handler(protocol).extract_full_command(buf)
123}
124
125pub fn parse_response(protocol: Protocol, buf: &[u8]) -> Option<String> {
127 get_handler(protocol).parse_response(buf)
128}
129
130pub fn format_response_detail(protocol: Protocol, buf: &[u8]) -> Option<String> {
132 get_handler(protocol).format_response_detail(buf)
133}
134
135pub fn get_handler(protocol: Protocol) -> &'static dyn ProtocolHandler {
137 match protocol {
138 Protocol::Redis => &RedisHandler,
139 Protocol::Mysql => &MysqlHandler,
140 Protocol::Amqp => &AmqpHandler,
141 Protocol::Postgres => &PostgresHandler,
142 Protocol::Mongodb => &MongodbHandler,
143 Protocol::Http => &HttpHandler,
144 Protocol::Memcached => &MemcachedHandler,
145 Protocol::Kafka => &KafkaHandler,
146 }
147}