pub mod resp;
pub mod mysql;
pub mod amqp;
pub mod postgres;
pub mod mongodb;
pub mod http;
pub mod memcached;
pub mod kafka;
pub mod handler;
pub mod handlers;
pub use resp::{RespValue, parse_resp};
pub use mysql::{parse_mysql_request, parse_mysql_response};
pub use amqp::{parse_amqp_request, parse_amqp_response, format_amqp_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, frame_len as amqp_frame_len};
pub use handler::ProtocolHandler;
pub use handler::HandshakeAction;
pub use handlers::*;
use std::time::{Duration, SystemTime};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
#[derive(Debug, Clone, Default)]
pub struct ConnectionState {
pub active_connections: usize,
pub has_connector: bool,
pub last_error: Option<String>,
pub last_active_at: Option<SystemTime>,
}
pub type StatusMap = Arc<Mutex<HashMap<String, ConnectionState>>>;
#[derive(Debug, Clone)]
pub struct ProxyEvent {
pub timestamp: SystemTime,
pub component: String,
pub protocol: Protocol,
pub command: String,
pub full_command: String,
pub response: String,
pub response_detail: String,
pub latency: Duration,
pub process: Option<String>,
pub src: Option<String>,
pub dest: Option<String>,
pub system: bool,
}
impl ProxyEvent {
pub fn system_event(component: &str, message: String) -> Self {
Self {
timestamp: SystemTime::now(),
component: component.to_string(),
protocol: Protocol::Redis, command: message.clone(),
full_command: message.clone(),
response: String::new(),
response_detail: message,
latency: Duration::ZERO,
process: None,
src: None,
dest: None,
system: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Direction {
Request,
Response,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
Redis,
Mysql,
Amqp,
Postgres,
Mongodb,
Http,
Memcached,
Kafka,
}
impl Protocol {
pub fn parse(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"redis" => Some(Protocol::Redis),
"mysql" => Some(Protocol::Mysql),
"amqp" | "rabbitmq" => Some(Protocol::Amqp),
"postgres" | "postgresql" => Some(Protocol::Postgres),
"mongodb" | "mongo" => Some(Protocol::Mongodb),
"http" | "elasticsearch" | "es" => Some(Protocol::Http),
"memcached" | "memcache" => Some(Protocol::Memcached),
"kafka" => Some(Protocol::Kafka),
_ => None,
}
}
}
pub fn parse_request(protocol: Protocol, buf: &[u8]) -> Option<String> {
get_handler(protocol).parse_request(buf)
}
pub fn extract_full_command(protocol: Protocol, buf: &[u8]) -> Option<String> {
get_handler(protocol).extract_full_command(buf)
}
pub fn parse_response(protocol: Protocol, buf: &[u8]) -> Option<String> {
get_handler(protocol).parse_response(buf)
}
pub fn format_response_detail(protocol: Protocol, buf: &[u8]) -> Option<String> {
get_handler(protocol).format_response_detail(buf)
}
pub fn get_handler(protocol: Protocol) -> &'static dyn ProtocolHandler {
match protocol {
Protocol::Redis => &RedisHandler,
Protocol::Mysql => &MysqlHandler,
Protocol::Amqp => &AmqpHandler,
Protocol::Postgres => &PostgresHandler,
Protocol::Mongodb => &MongodbHandler,
Protocol::Http => &HttpHandler,
Protocol::Memcached => &MemcachedHandler,
Protocol::Kafka => &KafkaHandler,
}
}