Skip to main content

relay_core_api/
flow.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::any::Any;
4use std::collections::HashMap;
5use std::sync::Arc;
6use url::Url;
7use uuid::Uuid;
8
9/// Trait for custom protocol layers to implement
10pub trait ProtocolLayer: Any + Send + Sync + std::fmt::Debug {
11    fn protocol_name(&self) -> &str;
12    fn as_any(&self) -> &dyn Any;
13    fn to_json(&self) -> serde_json::Value;
14}
15
16// Custom serializer for Arc<dyn ProtocolLayer>
17fn serialize_custom_layer<S>(
18    layer: &Arc<dyn ProtocolLayer>,
19    serializer: S,
20) -> Result<S::Ok, S::Error>
21where
22    S: Serializer,
23{
24    use serde::ser::SerializeStruct;
25    let mut state = serializer.serialize_struct("CustomLayer", 2)?;
26    state.serialize_field("protocol", layer.protocol_name())?;
27    state.serialize_field("data", &layer.to_json())?;
28    state.end()
29}
30
31// Custom deserializer for Arc<dyn ProtocolLayer>
32fn deserialize_custom_layer<'de, D>(deserializer: D) -> Result<Arc<dyn ProtocolLayer>, D::Error>
33where
34    D: Deserializer<'de>,
35{
36    let value = serde_json::Value::deserialize(deserializer)?;
37
38    // Try to extract protocol and data
39    let protocol = value
40        .get("protocol")
41        .and_then(|v| v.as_str())
42        .unwrap_or("unknown")
43        .to_string();
44
45    let data = value
46        .get("data")
47        .cloned()
48        .unwrap_or(serde_json::Value::Null);
49
50    Ok(Arc::new(GenericProtocolLayer { protocol, data }))
51}
52
53#[derive(Debug)]
54struct GenericProtocolLayer {
55    protocol: String,
56    data: serde_json::Value,
57}
58
59impl ProtocolLayer for GenericProtocolLayer {
60    fn protocol_name(&self) -> &str {
61        &self.protocol
62    }
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66    fn to_json(&self) -> serde_json::Value {
67        self.data.clone()
68    }
69}
70
71/// The central data structure representing a captured traffic flow.
72/// Designed to support L3/L4 layers initially, with L7 (HTTP) as a specialized layer.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct Flow {
75    pub id: Uuid,
76    pub start_time: DateTime<Utc>,
77    pub end_time: Option<DateTime<Utc>>,
78
79    /// L3/L4 Connection Info (IP, Port, Protocol)
80    pub network: NetworkInfo,
81
82    /// The application layer protocol detected or parsed
83    pub layer: Layer,
84
85    /// Analysis tags (e.g., "error", "large-body", "injected")
86    pub tags: Vec<String>,
87
88    /// Internal processing metadata (not necessarily for UI)
89    #[serde(skip)]
90    pub meta: HashMap<String, String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct NetworkInfo {
95    pub client_ip: String,
96    pub client_port: u16,
97    pub server_ip: String,
98    pub server_port: u16,
99    pub protocol: TransportProtocol,
100    pub tls: bool,
101    pub tls_version: Option<String>,
102    pub sni: Option<String>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106pub enum TransportProtocol {
107    TCP,
108    UDP,
109    ICMP,
110    Unknown,
111}
112
113/// The specific application layer data.
114/// Using an enum allows us to support HTTP now, but easily add DNS, WebSocket,
115/// or raw TCP streams later without breaking the top-level Flow structure.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "type", content = "data")]
118pub enum Layer {
119    Http(HttpLayer),
120    WebSocket(WebSocketLayer),
121    Tcp(TcpLayer),   // For raw TCP flows without L7 parsing
122    Udp(UdpLayer),   // For raw UDP flows
123    Quic(QuicLayer), // For QUIC flows
124    #[serde(
125        serialize_with = "serialize_custom_layer",
126        deserialize_with = "deserialize_custom_layer"
127    )]
128    Custom(Arc<dyn ProtocolLayer>),
129    Unknown,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct UdpLayer {
134    pub payload_size: usize,
135    pub packet_count: usize,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct QuicLayer {
140    pub sni: Option<String>,
141    pub alpn: Option<String>,
142    pub version: Option<String>,
143}
144
145// --- HTTP Layer ---
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct HttpLayer {
149    pub request: HttpRequest,
150    pub response: Option<HttpResponse>,
151    pub error: Option<String>,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct HttpRequest {
156    pub method: String,
157    pub url: Url,
158    pub version: String,                // HTTP/1.1, HTTP/2
159    pub headers: Vec<(String, String)>, // Vec preserves order, unlike HashMap
160    pub cookies: Vec<Cookie>,
161    pub query: Vec<(String, String)>,
162    pub body: Option<BodyData>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct HttpResponse {
167    pub status: u16,
168    pub status_text: String,
169    pub version: String,
170    pub headers: Vec<(String, String)>,
171    pub cookies: Vec<Cookie>,
172    pub body: Option<BodyData>,
173    pub timing: ResponseTiming,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct Cookie {
178    pub name: String,
179    pub value: String,
180    pub path: Option<String>,
181    pub domain: Option<String>,
182    pub expires: Option<String>,
183    pub http_only: Option<bool>,
184    pub secure: Option<bool>,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct ResponseTiming {
189    pub time_to_first_byte: Option<u64>, // ms
190    pub time_to_last_byte: Option<u64>,  // ms
191    /// TCP connect time in milliseconds for the upstream connection (new connections only).
192    pub connect_time_ms: Option<u64>,
193    /// TLS handshake time in milliseconds for the upstream connection (new connections only).
194    pub ssl_time_ms: Option<u64>,
195}
196
197// --- WebSocket Layer ---
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct WebSocketLayer {
201    pub handshake_request: HttpRequest,
202    pub handshake_response: HttpResponse,
203    pub messages: Vec<WebSocketMessage>,
204    pub closed: bool,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct WebSocketMessage {
209    pub id: Uuid,
210    pub timestamp: DateTime<Utc>,
211    pub direction: Direction, // ClientToServer, ServerToClient
212    pub content: BodyData,
213    pub opcode: String, // Text, Binary, Ping, Pong
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
217pub enum Direction {
218    ClientToServer,
219    ServerToClient,
220}
221
222// --- Raw TCP Layer ---
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct TcpLayer {
226    pub bytes_up: u64,
227    pub bytes_down: u64,
228    // Future: packets or data chunks
229}
230
231// --- Shared ---
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct BodyData {
235    /// Encoding (e.g., "utf-8", "base64")
236    pub encoding: String,
237    /// The actual content. If binary, it should be base64 encoded string.
238    pub content: String,
239    /// Original size in bytes before any decoding/unzipping
240    pub size: u64,
241}
242
243/// Enum for Incremental Flow Updates to avoid cloning the entire Flow object
244#[derive(Debug, Clone, Serialize, Deserialize)]
245#[serde(tag = "type", content = "data")]
246pub enum FlowUpdate {
247    /// Full Flow update (e.g., initial creation, request/response headers)
248    Full(Box<Flow>),
249    /// Incremental WebSocket Message
250    WebSocketMessage {
251        flow_id: String,
252        message: WebSocketMessage,
253    },
254    /// Incremental HTTP Body Update (Request or Response)
255    HttpBody {
256        flow_id: String,
257        direction: Direction,
258        body: BodyData,
259    },
260}