Skip to main content

ocular_protocol/
lib.rs

1pub mod resp;
2pub mod mysql;
3pub mod amqp;
4pub mod postgres;
5pub mod mongodb;
6pub mod http;
7pub mod memcached;
8pub mod kafka;
9pub mod handler;
10pub mod handlers;
11
12pub use resp::{RespValue, parse_resp};
13pub use mysql::{parse_mysql_request, parse_mysql_response};
14pub use amqp::{parse_amqp_request, parse_amqp_response, format_amqp_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, frame_len as amqp_frame_len};
15pub use handler::ProtocolHandler;
16pub use handler::HandshakeAction;
17pub use handlers::*;
18
19use std::time::{Duration, SystemTime};
20use std::sync::{Arc, Mutex};
21use std::collections::HashMap;
22
23/// Connection state for a proxy/capture component (shared with TUI for status display)
24#[derive(Debug, Clone, Default)]
25pub struct ConnectionState {
26    pub active_connections: usize,
27    pub has_connector: bool,
28    pub last_error: Option<String>,
29    pub last_active_at: Option<SystemTime>,
30}
31
32/// Shared map from component name to connection state
33pub type StatusMap = Arc<Mutex<HashMap<String, ConnectionState>>>;
34
35/// A single request→response event (merged)
36#[derive(Debug, Clone)]
37pub struct ProxyEvent {
38    pub timestamp: SystemTime,
39    pub component: String,
40    pub protocol: Protocol,
41    /// The command/SQL sent (request summary, truncated for display)
42    pub command: String,
43    /// Full command extracted from raw request (no truncation)
44    pub full_command: String,
45    /// Response summary (e.g. "OK", "ResultSet (19 rows, ...)")
46    pub response: String,
47    /// Formatted response detail for the detail panel
48    pub response_detail: String,
49    /// Request→response latency
50    pub latency: Duration,
51    /// Process that initiated the connection (PID + name)
52    pub process: Option<String>,
53    /// Client address (source)
54    pub src: Option<String>,
55    /// Remote address (destination)
56    pub dest: Option<String>,
57    /// Whether this is a system event (error/warning surfaced to TUI)
58    pub system: bool,
59}
60
61impl ProxyEvent {
62    /// Create a system event (error/warning) for display in the events panel
63    pub fn system_event(component: &str, message: String) -> Self {
64        Self {
65            timestamp: SystemTime::now(),
66            component: component.to_string(),
67            protocol: Protocol::Redis, // unused for system events
68            command: message.clone(),
69            full_command: message.clone(),
70            response: String::new(),
71            response_detail: message,
72            latency: Duration::ZERO,
73            process: None,
74            src: None,
75            dest: None,
76            system: true,
77        }
78    }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum Direction {
83    Request,
84    Response,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Protocol {
89    Redis,
90    Mysql,
91    Amqp,
92    Postgres,
93    Mongodb,
94    Http,
95    Memcached,
96    Kafka,
97}
98
99impl Protocol {
100    pub fn parse(s: &str) -> Option<Self> {
101        match s.to_lowercase().as_str() {
102            "redis" => Some(Protocol::Redis),
103            "mysql" => Some(Protocol::Mysql),
104            "amqp" | "rabbitmq" => Some(Protocol::Amqp),
105            "postgres" | "postgresql" => Some(Protocol::Postgres),
106            "mongodb" | "mongo" => Some(Protocol::Mongodb),
107            "http" | "elasticsearch" | "es" => Some(Protocol::Http),
108            "memcached" | "memcache" => Some(Protocol::Memcached),
109            "kafka" => Some(Protocol::Kafka),
110            _ => None,
111        }
112    }
113}
114
115/// Parse request bytes, returning a human-readable summary (truncated)
116pub fn parse_request(protocol: Protocol, buf: &[u8]) -> Option<String> {
117    get_handler(protocol).parse_request(buf)
118}
119
120/// Extract the full command/SQL from raw bytes (no truncation)
121pub fn extract_full_command(protocol: Protocol, buf: &[u8]) -> Option<String> {
122    get_handler(protocol).extract_full_command(buf)
123}
124
125/// Parse response bytes, returning a short summary
126pub fn parse_response(protocol: Protocol, buf: &[u8]) -> Option<String> {
127    get_handler(protocol).parse_response(buf)
128}
129
130/// Parse response bytes into a detailed display string (for detail panel)
131pub fn format_response_detail(protocol: Protocol, buf: &[u8]) -> Option<String> {
132    get_handler(protocol).format_response_detail(buf)
133}
134
135/// Get the protocol handler for a given protocol.
136pub fn get_handler(protocol: Protocol) -> &'static dyn ProtocolHandler {
137    match protocol {
138        Protocol::Redis => &RedisHandler,
139        Protocol::Mysql => &MysqlHandler,
140        Protocol::Amqp => &AmqpHandler,
141        Protocol::Postgres => &PostgresHandler,
142        Protocol::Mongodb => &MongodbHandler,
143        Protocol::Http => &HttpHandler,
144        Protocol::Memcached => &MemcachedHandler,
145        Protocol::Kafka => &KafkaHandler,
146    }
147}