use std::sync::Arc;
use crate::tcp_proxy::h2_parser::H2Event;
use crate::types::odd_box_event::GlobalEvent::*;
use crate::{global_state::GlobalState, types::odd_box_event::TCPEvent};
pub async fn run(state:Arc<GlobalState>) {
let mut receiver = state.global_broadcast_channel.subscribe();
let observer = &state.monitoring_station;
let liveness_token: Arc<bool> = Arc::new(true);
{
crate::BG_WORKER_THREAD_MAP.insert("The Observer".into(), crate::types::proc_info::BgTaskInfo {
liveness_ptr: Arc::downgrade(&liveness_token),
status: "Alive".into()
});
}
loop {
if true == state.app_state.exit.load(std::sync::atomic::Ordering::Relaxed) {
break
}
if let Ok(msg) = receiver.recv().await {
if !state.app_state.enable_global_traffic_inspection.load(std::sync::atomic::Ordering::Relaxed) {
observer.reset();
continue
}
match msg {
GotResponseFromBackend(k,data) => {
observer.push_extra(&k, &format!("Odd-Box got a response from backend service: {}",&data),false); },
SentHttpRequestToBackend(k,data) => {
observer.push_extra(&k, &format!("Odd-Box sent a request to the backend service - {}",&data),true); }
TcpEvent(TCPEvent::Close(key)) => {
_ = observer.tcp_connections.remove(&key);
}
TcpEvent(e) => observer.push(e)
}
}
}
}
fn check_if_grpc(event:&H2Event,decode_as_grpc:bool) -> bool {
let mut is_grpc = decode_as_grpc;
match event {
crate::tcp_proxy::h2_parser::H2Event::IncomingHeaders { stream_id:_, headers } => {
if decode_as_grpc == false {
if let Some(content_type) = headers.get("content-type") {
if content_type.starts_with("application/grpc") {
is_grpc = true;
}
}
}
}
crate::tcp_proxy::h2_parser::H2Event::IncomingRequest(http_request) => {
if decode_as_grpc == false {
if let Some(content_type) = http_request.headers.get("content-type") {
if content_type.starts_with("application/grpc") {
is_grpc = true;
}
}
}
}
_e => {
}
}
is_grpc
}
pub mod obs {
use std::time::SystemTime;
use dashmap::DashMap;
use futures::{FutureExt, StreamExt};
use serde::Serialize;
use tracing::trace;
use crate::{tcp_proxy::{h1_parser::{H1Observer, HttpData}, h2_parser::{H2Event, H2Observer}}, types::{odd_box_event::EventForWebsocketClients, proxy_state::{ConnectionKey, ProxyActiveTCPConnection}}};
#[derive(Debug,Clone,Serialize)]
pub enum DecodedPacket {
Http0or1(HttpData),
Http2(H2Event),
Unknown(String),
BackendToOddBox(ConnectionKey,DataTrans),
OddBoxToBackend(ConnectionKey,DataTrans),
OddBoxToClient(ConnectionKey,DataTrans),
ClientToOddBox(ConnectionKey,DataTrans),
}
#[derive(Debug,Clone,Serialize)]
pub struct DataTrans {
pub http2_stream_id : Option<u64>,
pub data : String }
#[derive(Debug)]
pub struct TCPConnection {
pub id : ConnectionKey,
pub packets : Vec<DecodedPacket>,
pub state : TCPConnectionState,
pub connection : ProxyActiveTCPConnection,
pub extra_log : Vec<String>,
pub bytes_sent : usize,
pub bytes_rec : usize,
pub created_timestamp : std::time::SystemTime,
pub local_process_name_and_pid : Option<(String,i32)>
}
#[derive(Debug)]
pub enum TCPConnectionState {
Open,
Closed
}
#[derive(Debug)]
pub struct MonitoringStation {
pub tcp_connections : DashMap<ConnectionKey,TCPConnection>,
h2_observers : DashMap::<ConnectionKey,H2Observer>,
h1_observers :DashMap::<ConnectionKey,H1Observer>,
wsb: tokio::sync::broadcast::Sender<EventForWebsocketClients>
}
impl MonitoringStation {
pub fn new(wsb:tokio::sync::broadcast::Sender<EventForWebsocketClients>) -> Self {
Self {
tcp_connections: DashMap::new(),
h1_observers: DashMap::new(),
h2_observers: DashMap::new(),
wsb
}
}
pub fn reset(&self) {
self.h1_observers.clear();
self.h1_observers.shrink_to_fit();
self.h2_observers.clear();
self.h2_observers.shrink_to_fit();
self.tcp_connections.clear();
self.tcp_connections.shrink_to_fit();
}
fn update_connection(&self,id:&ConnectionKey,f: impl FnOnce(&mut TCPConnection) -> ()) {
if let Some(mut c) = self.tcp_connections.get_mut(id) {
f(&mut c)
}
}
pub fn push_extra(&self,id:&ConnectionKey,data:&str,outgoing:bool) {
self.update_connection(id, |c| {
c.extra_log.push(data.to_string());
});
if outgoing {
trace!("Connection {id}: sent data to backend using backchannel! data: {:?}",data);
_ = self.wsb.send(EventForWebsocketClients::SentReqToBackend(*id, data.to_string()));
} else {
trace!("Connection {id}: received data from backend using backchannel! data: {:?}",data);
_ = self.wsb.send(EventForWebsocketClients::ReceivedResFromBackend(*id, data.to_string()));
}
}
fn handle_packet(
&self,tcp_connection_id:u64,
packet:&DecodedPacket,
) {
match packet {
DecodedPacket::Http2(h2_event) => {
trace!("Connection {tcp_connection_id}: {}",h2_event);
_ = self.wsb.send(EventForWebsocketClients::Http2Event(tcp_connection_id,h2_event.stream_id(), packet.clone()));
},
DecodedPacket::Http0or1(h1_event) => {
trace!("Connection {tcp_connection_id} {}",h1_event);
_ = self.wsb.send(EventForWebsocketClients::Http1Event(tcp_connection_id,packet.clone()));
}
x => {
trace!("Connection {tcp_connection_id}: {:?}",x);
_ = self.wsb.send(EventForWebsocketClients::Unknown(tcp_connection_id,format!("{:?}",x)));
}
};
}
pub fn push(&self,event:super::TCPEvent) {
match event {
crate::types::odd_box_event::TCPEvent::Open(proxy_active_tcpconnection) => {
if let Some(ls) = proxy_active_tcpconnection.client_socket_address {
if let Some(odd_box_socket) = proxy_active_tcpconnection.odd_box_socket {
if let Ok(Some((process_name,process_id))) = crate::tcp_pid::get_process_by_socket(&ls,&odd_box_socket) {
self.update_connection(&proxy_active_tcpconnection.connection_key, |c|{
c.local_process_name_and_pid = Some((process_name,process_id))
});
}
}
}
},
crate::types::odd_box_event::TCPEvent::Close(k) => {
if let Some(mut c) = self.tcp_connections.get_mut(&k) {
c.state = TCPConnectionState::Closed
}
},
crate::types::odd_box_event::TCPEvent::Update(proxy_active_tcpconnection) => {
if let Some(mut c) = self.tcp_connections.get_mut(&proxy_active_tcpconnection.connection_key) {
c.connection = proxy_active_tcpconnection;
} else {
self.tcp_connections.insert(proxy_active_tcpconnection.connection_key, TCPConnection {
id: proxy_active_tcpconnection.connection_key,
packets: vec![],
extra_log: vec![],
state: TCPConnectionState::Open,
connection: proxy_active_tcpconnection,
created_timestamp: SystemTime::now(),
bytes_rec: 0,
bytes_sent: 0,
local_process_name_and_pid:None
});
}
},
crate::types::odd_box_event::TCPEvent::RawBytesFromOddBoxToClient(key,is_http2,data) => {
if is_http2 {
let mut x: dashmap::mapref::one::RefMut<'_, u64, _> = self.h2_observers.entry(key).or_default();
x.write_outgoing(&data);
let (_,b) = x.pair_mut();
let mut is_grpc = false;
let mut events = vec![];
while let Some(Some(event)) = b.next().now_or_never() {
let result = is_grpc || super::check_if_grpc(&event,is_grpc);
is_grpc = result;
events.push(event);
}
self.update_connection(&key, |c| {
c.bytes_sent += data.len();
if is_grpc && c.connection.is_grpc.is_none() {
c.connection.is_grpc = Some(true)
}
for e in events {
let p = DecodedPacket::Http2(e);
self.handle_packet(key,&p);
c.packets.push(p)
}
});
} else {
let mut x = self.h1_observers.entry(key).or_default();
let (_,v) = x.pair_mut();
v.push(crate::tcp_proxy::h1_parser::DataDirection::ServerToClient, &data);
self.update_connection(&key, |c|{
c.bytes_sent += data.len();
let events = v.parse();
for e in events {
let p = DecodedPacket::Http0or1(e);
self.handle_packet(key,&p);
c.packets.push(p)
}
});
}
},
crate::types::odd_box_event::TCPEvent::RawBytesFromClientToOddBox(key,is_http2,data) => {
if is_http2 {
let mut x = self.h2_observers.entry(key).or_default();
x.write_incoming(&data);
let mut is_grpc = false;
let mut events = vec![];
while let Some(Some(event)) = x.next().now_or_never() {
let result = is_grpc || super::check_if_grpc(&event,is_grpc);
is_grpc = result;
events.push(event);
}
self.update_connection(&key, |c| {
c.bytes_rec += data.len();
for event in events {
let p = DecodedPacket::Http2(event);
self.handle_packet(key,&p);
c.packets.push(p);
}
if is_grpc {
c.connection.is_grpc = Some(true);
}
});
} else {
let mut x = self.h1_observers.entry(key).or_default();
let (_,v) = x.pair_mut();
v.push(crate::tcp_proxy::h1_parser::DataDirection::ClientToServer, &data);
let events = v.parse();
let mut is_ws = false;
self.update_connection(&key, |c| {
c.bytes_rec += data.len();
for event in events {
match event {
crate::tcp_proxy::h1_parser::HttpData::ProtocolSwitchedToWebSocket => {
is_ws = true;
break;
},
_ => {}
}
let p = DecodedPacket::Http0or1(event);
self.handle_packet(key,&p);
c.packets.push(p);
}
if is_ws {
c.connection.is_websocket = Some(true);
}
});
}
},
}
}
}
}