Skip to main content

relay_core_api/
flow.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::any::Any;
4use std::collections::HashMap;
5use std::sync::Arc;
6use url::Url;
7use uuid::Uuid;
8
9/// Trait for custom protocol layers to implement
10pub trait ProtocolLayer: Any + Send + Sync + std::fmt::Debug {
11    fn protocol_name(&self) -> &str;
12    fn as_any(&self) -> &dyn Any;
13    fn to_json(&self) -> serde_json::Value;
14}
15
16// Custom serializer for Arc<dyn ProtocolLayer>
17fn serialize_custom_layer<S>(
18    layer: &Arc<dyn ProtocolLayer>,
19    serializer: S,
20) -> Result<S::Ok, S::Error>
21where
22    S: Serializer,
23{
24    use serde::ser::SerializeStruct;
25    let mut state = serializer.serialize_struct("CustomLayer", 2)?;
26    state.serialize_field("protocol", layer.protocol_name())?;
27    state.serialize_field("data", &layer.to_json())?;
28    state.end()
29}
30
31// Custom deserializer for Arc<dyn ProtocolLayer>
32fn deserialize_custom_layer<'de, D>(deserializer: D) -> Result<Arc<dyn ProtocolLayer>, D::Error>
33where
34    D: Deserializer<'de>,
35{
36    let value = serde_json::Value::deserialize(deserializer)?;
37
38    // Try to extract protocol and data
39    let protocol = value
40        .get("protocol")
41        .and_then(|v| v.as_str())
42        .unwrap_or("unknown")
43        .to_string();
44
45    let data = value
46        .get("data")
47        .cloned()
48        .unwrap_or(serde_json::Value::Null);
49
50    Ok(Arc::new(GenericProtocolLayer { protocol, data }))
51}
52
53#[derive(Debug)]
54struct GenericProtocolLayer {
55    protocol: String,
56    data: serde_json::Value,
57}
58
59impl ProtocolLayer for GenericProtocolLayer {
60    fn protocol_name(&self) -> &str {
61        &self.protocol
62    }
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66    fn to_json(&self) -> serde_json::Value {
67        self.data.clone()
68    }
69}
70
71/// P4: Structured trace of resilience-related events during a proxy request.
72/// Not yet attached to flow struct — will be wired in P4b via HttpLayer.
73#[derive(Debug, Clone, Serialize, Deserialize, Default)]
74pub struct ResilienceTrace {
75    /// Number of retry attempts for this request.
76    #[serde(default)]
77    pub retry_count: u32,
78    /// Whether the circuit breaker was open when this request was attempted.
79    #[serde(default)]
80    pub circuit_open: bool,
81    /// Whether the request body budget was exceeded (rules skipped).
82    #[serde(default)]
83    pub budget_exceeded: bool,
84    /// Upstream errors encountered (non-2xx, connection errors).
85    #[serde(default, skip_serializing_if = "Vec::is_empty")]
86    pub upstream_errors: Vec<String>,
87}
88
89/// The central data structure representing a captured traffic flow.
90/// Designed to support L3/L4 layers initially, with L7 (HTTP) as a specialized layer.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct Flow {
93    pub id: Uuid,
94    pub start_time: DateTime<Utc>,
95    pub end_time: Option<DateTime<Utc>>,
96
97    /// L3/L4 Connection Info (IP, Port, Protocol)
98    pub network: NetworkInfo,
99
100    /// The application layer protocol detected or parsed
101    pub layer: Layer,
102
103    /// Analysis tags (e.g., "error", "large-body", "injected")
104    pub tags: Vec<String>,
105
106    /// Internal processing metadata (not necessarily for UI)
107    #[serde(skip)]
108    pub meta: HashMap<String, String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct NetworkInfo {
113    pub client_ip: String,
114    pub client_port: u16,
115    pub server_ip: String,
116    pub server_port: u16,
117    pub protocol: TransportProtocol,
118    pub tls: bool,
119    pub tls_version: Option<String>,
120    pub sni: Option<String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
124pub enum TransportProtocol {
125    TCP,
126    UDP,
127    ICMP,
128    Unknown,
129}
130
131/// The specific application layer data.
132/// Using an enum allows us to support HTTP now, but easily add DNS, WebSocket,
133/// or raw TCP streams later without breaking the top-level Flow structure.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135#[serde(tag = "type", content = "data")]
136pub enum Layer {
137    Http(HttpLayer),
138    WebSocket(WebSocketLayer),
139    Tcp(TcpLayer),   // For raw TCP flows without L7 parsing
140    Udp(UdpLayer),   // For raw UDP flows
141    Quic(QuicLayer), // For QUIC flows
142    #[serde(
143        serialize_with = "serialize_custom_layer",
144        deserialize_with = "deserialize_custom_layer"
145    )]
146    Custom(Arc<dyn ProtocolLayer>),
147    Unknown,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct UdpLayer {
152    pub payload_size: usize,
153    pub packet_count: usize,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct QuicLayer {
158    pub sni: Option<String>,
159    pub alpn: Option<String>,
160    pub version: Option<String>,
161}
162
163// --- HTTP Layer ---
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct HttpLayer {
167    pub request: HttpRequest,
168    pub response: Option<HttpResponse>,
169    pub error: Option<String>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct HttpRequest {
174    pub method: String,
175    pub url: Url,
176    pub version: String,                // HTTP/1.1, HTTP/2
177    pub headers: Vec<(String, String)>, // Vec preserves order, unlike HashMap
178    pub cookies: Vec<Cookie>,
179    pub query: Vec<(String, String)>,
180    pub body: Option<BodyData>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct HttpResponse {
185    pub status: u16,
186    pub status_text: String,
187    pub version: String,
188    pub headers: Vec<(String, String)>,
189    pub cookies: Vec<Cookie>,
190    pub body: Option<BodyData>,
191    pub timing: ResponseTiming,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct Cookie {
196    pub name: String,
197    pub value: String,
198    pub path: Option<String>,
199    pub domain: Option<String>,
200    pub expires: Option<String>,
201    pub http_only: Option<bool>,
202    pub secure: Option<bool>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ResponseTiming {
207    pub time_to_first_byte: Option<u64>, // ms
208    pub time_to_last_byte: Option<u64>,  // ms
209    /// TCP connect time in milliseconds for the upstream connection (new connections only).
210    pub connect_time_ms: Option<u64>,
211    /// TLS handshake time in milliseconds for the upstream connection (new connections only).
212    pub ssl_time_ms: Option<u64>,
213}
214
215// --- WebSocket Layer ---
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct WebSocketLayer {
219    pub handshake_request: HttpRequest,
220    pub handshake_response: HttpResponse,
221    pub messages: Vec<WebSocketMessage>,
222    pub closed: bool,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct WebSocketMessage {
227    pub id: Uuid,
228    pub timestamp: DateTime<Utc>,
229    pub direction: Direction, // ClientToServer, ServerToClient
230    pub content: BodyData,
231    pub opcode: String, // Text, Binary, Ping, Pong
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
235pub enum Direction {
236    ClientToServer,
237    ServerToClient,
238}
239
240// --- Raw TCP Layer ---
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct TcpLayer {
244    pub bytes_up: u64,
245    pub bytes_down: u64,
246    // Future: packets or data chunks
247}
248
249// --- Shared ---
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct BodyData {
253    /// Encoding (e.g., "utf-8", "base64")
254    pub encoding: String,
255    /// The actual content. If binary, it should be base64 encoded string.
256    pub content: String,
257    /// Original size in bytes before any decoding/unzipping
258    pub size: u64,
259}
260
261/// Enum for Incremental Flow Updates to avoid cloning the entire Flow object
262#[derive(Debug, Clone, Serialize, Deserialize)]
263#[serde(tag = "type", content = "data")]
264pub enum FlowUpdate {
265    /// Full Flow update (e.g., initial creation, request/response headers)
266    Full(Box<Flow>),
267    /// Incremental WebSocket Message
268    WebSocketMessage {
269        flow_id: String,
270        message: WebSocketMessage,
271    },
272    /// Incremental HTTP Body Update (Request or Response)
273    HttpBody {
274        flow_id: String,
275        direction: Direction,
276        body: BodyData,
277    },
278}