Skip to main content

relay_core_api/
flow.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::any::Any;
4use std::collections::HashMap;
5use std::sync::Arc;
6use url::Url;
7use uuid::Uuid;
8
9/// Trait for custom protocol layers to implement
10pub trait ProtocolLayer: Any + Send + Sync + std::fmt::Debug {
11    fn protocol_name(&self) -> &str;
12    fn as_any(&self) -> &dyn Any;
13    fn to_json(&self) -> serde_json::Value;
14}
15
16// Custom serializer for Arc<dyn ProtocolLayer>
17fn serialize_custom_layer<S>(
18    layer: &Arc<dyn ProtocolLayer>,
19    serializer: S,
20) -> Result<S::Ok, S::Error>
21where
22    S: Serializer,
23{
24    use serde::ser::SerializeStruct;
25    let mut state = serializer.serialize_struct("CustomLayer", 2)?;
26    state.serialize_field("protocol", layer.protocol_name())?;
27    state.serialize_field("data", &layer.to_json())?;
28    state.end()
29}
30
31// Custom deserializer for Arc<dyn ProtocolLayer>
32fn deserialize_custom_layer<'de, D>(deserializer: D) -> Result<Arc<dyn ProtocolLayer>, D::Error>
33where
34    D: Deserializer<'de>,
35{
36    let value = serde_json::Value::deserialize(deserializer)?;
37
38    // Try to extract protocol and data
39    let protocol = value
40        .get("protocol")
41        .and_then(|v| v.as_str())
42        .unwrap_or("unknown")
43        .to_string();
44
45    let data = value
46        .get("data")
47        .cloned()
48        .unwrap_or(serde_json::Value::Null);
49
50    Ok(Arc::new(GenericProtocolLayer { protocol, data }))
51}
52
53#[derive(Debug)]
54struct GenericProtocolLayer {
55    protocol: String,
56    data: serde_json::Value,
57}
58
59impl ProtocolLayer for GenericProtocolLayer {
60    fn protocol_name(&self) -> &str {
61        &self.protocol
62    }
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66    fn to_json(&self) -> serde_json::Value {
67        self.data.clone()
68    }
69}
70
71fn u32_is_zero(v: &u32) -> bool {
72    *v == 0
73}
74
75/// Structured trace of resilience-related events during a proxy request.
76#[derive(Debug, Clone, Serialize, Deserialize, Default)]
77pub struct ResilienceTrace {
78    /// Number of retry attempts for this request.
79    /// Currently unused; reserved for future retry support.
80    #[serde(default, skip_serializing_if = "u32_is_zero")]
81    pub retry_count: u32,
82    /// Whether the circuit breaker was open when this request was attempted.
83    #[serde(default)]
84    pub circuit_open: bool,
85    /// Whether the request body budget was exceeded (rules skipped).
86    #[serde(default)]
87    pub budget_exceeded: bool,
88    /// Upstream errors encountered (non-2xx, connection errors).
89    #[serde(default, skip_serializing_if = "Vec::is_empty")]
90    pub upstream_errors: Vec<String>,
91    /// P3: Timeout classification (e.g., "connect", "read", "total", "tls").
92    #[serde(default, skip_serializing_if = "Option::is_none")]
93    pub timeout_type: Option<String>,
94}
95
96/// The central data structure representing a captured traffic flow.
97/// Designed to support L3/L4 layers initially, with L7 (HTTP) as a specialized layer.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct Flow {
100    pub id: Uuid,
101    pub start_time: DateTime<Utc>,
102    pub end_time: Option<DateTime<Utc>>,
103
104    /// L3/L4 Connection Info (IP, Port, Protocol)
105    pub network: NetworkInfo,
106
107    /// The application layer protocol detected or parsed
108    pub layer: Layer,
109
110    /// Analysis tags (e.g., "error", "large-body", "injected")
111    pub tags: Vec<String>,
112
113    /// Internal processing metadata (not necessarily for UI)
114    #[serde(skip)]
115    pub meta: HashMap<String, String>,
116
117    /// P4: Resilience trace (circuit breaker, retries, budget exceeded)
118    #[serde(default, skip_serializing_if = "Option::is_none")]
119    pub resilience_trace: Option<ResilienceTrace>,
120
121    /// S11: Rule engine variables (SetVariable action results) exposed to scripts.
122    /// Cleared at end of flow lifecycle.
123    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
124    pub rule_variables: HashMap<String, String>,
125
126    /// S10a: Rule IDs that matched and executed on this flow (exposed to scripts).
127    #[serde(default, skip_serializing_if = "Vec::is_empty")]
128    pub matched_rules: Vec<String>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct NetworkInfo {
133    pub client_ip: String,
134    pub client_port: u16,
135    pub server_ip: String,
136    pub server_port: u16,
137    pub protocol: TransportProtocol,
138    pub tls: bool,
139    pub tls_version: Option<String>,
140    pub sni: Option<String>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub enum TransportProtocol {
145    TCP,
146    UDP,
147    ICMP,
148    Unknown,
149}
150
151/// The specific application layer data.
152/// Using an enum allows us to support HTTP now, but easily add DNS, WebSocket,
153/// or raw TCP streams later without breaking the top-level Flow structure.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155#[serde(tag = "type", content = "data")]
156pub enum Layer {
157    Http(HttpLayer),
158    WebSocket(WebSocketLayer),
159    Tcp(TcpLayer),   // For raw TCP flows without L7 parsing
160    Udp(UdpLayer),   // For raw UDP flows
161    Quic(QuicLayer), // For QUIC flows
162    #[serde(
163        serialize_with = "serialize_custom_layer",
164        deserialize_with = "deserialize_custom_layer"
165    )]
166    Custom(Arc<dyn ProtocolLayer>),
167    Unknown,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct UdpLayer {
172    pub payload_size: usize,
173    pub packet_count: usize,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct QuicLayer {
178    pub sni: Option<String>,
179    pub alpn: Option<String>,
180    pub version: Option<String>,
181}
182
183// --- HTTP Layer ---
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct HttpLayer {
187    pub request: HttpRequest,
188    pub response: Option<HttpResponse>,
189    pub error: Option<String>,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct HttpRequest {
194    pub method: String,
195    pub url: Url,
196    pub version: String,                // HTTP/1.1, HTTP/2
197    pub headers: Vec<(String, String)>, // Vec preserves order, unlike HashMap
198    pub cookies: Vec<Cookie>,
199    pub query: Vec<(String, String)>,
200    pub body: Option<BodyData>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct HttpResponse {
205    pub status: u16,
206    pub status_text: String,
207    pub version: String,
208    pub headers: Vec<(String, String)>,
209    pub cookies: Vec<Cookie>,
210    pub body: Option<BodyData>,
211    pub timing: ResponseTiming,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct Cookie {
216    pub name: String,
217    pub value: String,
218    pub path: Option<String>,
219    pub domain: Option<String>,
220    pub expires: Option<String>,
221    pub http_only: Option<bool>,
222    pub secure: Option<bool>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct ResponseTiming {
227    pub time_to_first_byte: Option<u64>, // ms
228    pub time_to_last_byte: Option<u64>,  // ms
229    /// TCP connect time in milliseconds for the upstream connection (new connections only).
230    pub connect_time_ms: Option<u64>,
231    /// TLS handshake time in milliseconds for the upstream connection (new connections only).
232    pub ssl_time_ms: Option<u64>,
233}
234
235// --- WebSocket Layer ---
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct WebSocketLayer {
239    pub handshake_request: HttpRequest,
240    pub handshake_response: HttpResponse,
241    pub messages: Vec<WebSocketMessage>,
242    pub closed: bool,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct WebSocketMessage {
247    pub id: Uuid,
248    pub timestamp: DateTime<Utc>,
249    pub direction: Direction, // ClientToServer, ServerToClient
250    pub content: BodyData,
251    pub opcode: String, // Text, Binary, Ping, Pong
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255pub enum Direction {
256    ClientToServer,
257    ServerToClient,
258}
259
260// --- Raw TCP Layer ---
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct TcpLayer {
264    pub bytes_up: u64,
265    pub bytes_down: u64,
266    // Future: packets or data chunks
267}
268
269// --- Shared ---
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct BodyData {
273    /// Encoding (e.g., "utf-8", "base64")
274    pub encoding: String,
275    /// The actual content. If binary, it should be base64 encoded string.
276    pub content: String,
277    /// Original size in bytes before any decoding/unzipping
278    pub size: u64,
279}
280
281/// Enum for Incremental Flow Updates to avoid cloning the entire Flow object
282#[derive(Debug, Clone, Serialize, Deserialize)]
283#[serde(tag = "type", content = "data")]
284pub enum FlowUpdate {
285    /// Full Flow update (e.g., initial creation, request/response headers)
286    Full(Box<Flow>),
287    /// Incremental WebSocket Message
288    WebSocketMessage {
289        flow_id: String,
290        message: WebSocketMessage,
291    },
292    /// Incremental HTTP Body Update (Request or Response)
293    HttpBody {
294        flow_id: String,
295        direction: Direction,
296        body: BodyData,
297    },
298    /// P1: Body budget exceeded notification
299    BodyBudgetExceeded {
300        flow_id: String,
301        direction: Direction,
302    },
303}