derusted 0.2.0

Programmable HTTPS interception and traffic inspection engine for security-critical applications
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
//! HTTP/2 MITM Handler - Production-Ready Proxying
//!
//! This module provides production-grade HTTP/2 proxying with:
//! - Proper flow control (window updates)
//! - Backpressure handling (prevents memory bloat)
//! - GOAWAY/RST frame handling
//! - Stream lifecycle management
//!
//! ## Two-Layer HTTP/2 Architecture
//!
//! **Parser layer** (`http2_parser.rs`): Low-level frame parsing for inspection/logging only.
//! - Manual frame parsing
//! - HPACK decompression
//! - Request/response extraction
//! - **No flow control or backpressure**
//!
//! **MITM layer** (`http2_mitm.rs`): Production HTTP/2 proxying using `h2` crate.
//! - Automatic flow control
//! - Backpressure via reserve_capacity()
//! - Per-stream concurrency
//! - Battle-tested (used by hyper)
//!
//! ## Limitations (v0.1.0)
//!
//! - **Server push NOT supported**: Push promise frames are silently dropped
//! - **Stream priorities NOT enforced**: Priorities forwarded but not scheduled
//! - **No HTTP/2 upgrade**: Only ALPN negotiation supported

use crate::mitm::{LogStorage, LoggingPolicy, PiiRedactor, RequestMetadata};
use bytes::Bytes;
use h2::RecvStream;
use http::{Request, Response};
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_rustls::{client::TlsStream as ClientTlsStream, server::TlsStream as ServerTlsStream};
use tracing::{debug, error, info, warn};

/// HTTP/2 MITM configuration
#[derive(Debug, Clone)]
pub struct Http2Config {
    /// Initial flow control window (per stream) - default: 64KB
    pub initial_window_size: u32,

    /// Connection-level window - default: 1MB
    pub initial_connection_window_size: u32,

    /// Max concurrent streams (DoS protection) - default: 100
    pub max_concurrent_streams: u32,

    /// Max frame size - default: 16KB
    pub max_frame_size: u32,

    /// Enable server push (always false in v0.1.0)
    pub enable_server_push: bool,
}

impl Default for Http2Config {
    fn default() -> Self {
        Self {
            initial_window_size: 65535,                // 64KB
            initial_connection_window_size: 1_048_576, // 1MB
            max_concurrent_streams: 100,
            max_frame_size: 16384,     // 16KB
            enable_server_push: false, // Not supported in v0.1.0
        }
    }
}

/// Handle HTTP/2 MITM connection with production-grade flow control
///
/// This function:
/// 1. Establishes HTTP/2 server handshake with client
/// 2. Establishes HTTP/2 client connection to upstream
/// 3. Proxies streams with proper flow control and backpressure
/// 4. Logs request/response metadata to SQLite
///
/// ## Parameters
/// - `client_tls`: TLS stream from browser/client
/// - `upstream_tls`: TLS stream to target server
/// - `target_host`: Hostname for logging
/// - `target_port`: Port for logging
/// - `logging_policy`: PII redaction and sampling config
/// - `log_storage`: Optional SQLite storage
/// - `config`: HTTP/2 configuration (window sizes, limits)
///
/// ## Returns
/// - `Ok(())` when connection closes gracefully
/// - `Err(H2Error)` on fatal errors (handshake failure, connection error)
pub async fn handle_http2_mitm(
    client_tls: ServerTlsStream<TcpStream>,
    upstream_tls: ClientTlsStream<TcpStream>,
    target_host: String,
    target_port: u16,
    logging_policy: Arc<LoggingPolicy>,
    log_storage: Option<Arc<LogStorage>>,
    config: Http2Config,
) -> Result<(), H2Error> {
    debug!(
        target_host = %target_host,
        target_port = target_port,
        "Starting HTTP/2 MITM connection"
    );

    // 1. H2 server handshake (client-facing)
    let mut client_h2 = h2::server::Builder::new()
        .initial_window_size(config.initial_window_size)
        .initial_connection_window_size(config.initial_connection_window_size)
        .max_concurrent_streams(config.max_concurrent_streams)
        .max_frame_size(config.max_frame_size)
        .handshake(client_tls)
        .await
        .map_err(H2Error::ClientHandshakeFailed)?;

    debug!("HTTP/2 client handshake complete");

    // 2. H2 client handshake (upstream-facing)
    let (upstream_send_request, connection) = h2::client::Builder::new()
        .initial_window_size(config.initial_window_size)
        .initial_connection_window_size(config.initial_connection_window_size)
        .max_frame_size(config.max_frame_size)
        .handshake(upstream_tls)
        .await
        .map_err(H2Error::UpstreamHandshakeFailed)?;

    debug!("HTTP/2 upstream handshake complete");

    // 3. Drive upstream connection in background (required by h2 client)
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            error!(error = ?e, "HTTP/2 upstream connection error");
        }
    });

    // 4. Accept and proxy streams
    let mut stream_count = 0u64;
    while let Some(result) = client_h2.accept().await {
        match result {
            Ok((request, respond)) => {
                stream_count += 1;
                let stream_id = stream_count;

                debug!(stream_id, "Accepted HTTP/2 stream");

                // Clone for task
                let upstream_clone = upstream_send_request.clone();
                let target_host_clone = target_host.clone();
                let logging_policy_clone = Arc::clone(&logging_policy);
                let log_storage_clone = log_storage.clone();

                // Spawn task per stream (concurrency)
                tokio::spawn(async move {
                    let start = Instant::now();
                    match proxy_h2_stream(
                        request,
                        respond,
                        upstream_clone,
                        target_host_clone,
                        target_port,
                        logging_policy_clone,
                        log_storage_clone,
                    )
                    .await
                    {
                        Ok(()) => {
                            debug!(
                                stream_id,
                                duration_ms = start.elapsed().as_millis(),
                                "HTTP/2 stream completed"
                            );
                        }
                        Err(e) => {
                            error!(stream_id, error = ?e, "HTTP/2 stream error");
                        }
                    }
                });
            }
            Err(e) => {
                // Check error reason
                if let Some(reason) = e.reason() {
                    // Check for graceful shutdown reasons
                    if reason == h2::Reason::NO_ERROR {
                        info!("HTTP/2 graceful shutdown (NO_ERROR)");
                        break;
                    } else {
                        warn!(reason = ?reason, "HTTP/2 stream error, continuing");
                        // Continue accepting other streams
                        continue;
                    }
                } else {
                    error!(error = ?e, "HTTP/2 accept error without reason");
                    return Err(H2Error::AcceptFailed(e.to_string()));
                }
            }
        }
    }

    info!(
        target_host = %target_host,
        stream_count,
        "HTTP/2 MITM connection closed"
    );

    Ok(())
}

/// Proxy a single HTTP/2 stream with flow control
///
/// This function:
/// 1. Forwards request (headers + body) from client to upstream
/// 2. Receives response from upstream
/// 3. Forwards response (headers + body) back to client
/// 4. Applies backpressure via reserve_capacity()
/// 5. Releases flow control windows after sending
/// 6. Logs complete request/response to SQLite
async fn proxy_h2_stream(
    request: Request<RecvStream>,
    mut client_respond: h2::server::SendResponse<Bytes>,
    mut upstream: h2::client::SendRequest<Bytes>,
    target_host: String,
    target_port: u16,
    logging_policy: Arc<LoggingPolicy>,
    log_storage: Option<Arc<LogStorage>>,
) -> Result<(), H2Error> {
    let request_start = Instant::now();

    // 1. Extract request parts
    let (parts, mut recv_body) = request.into_parts();
    let method = parts.method.to_string();
    let path = parts
        .uri
        .path_and_query()
        .map(|pq| pq.as_str())
        .unwrap_or("/")
        .to_string();

    debug!(method = %method, path = %path, "Proxying HTTP/2 request");

    // 2. Send request headers to upstream
    let (response_future, mut send_body) = upstream
        .send_request(Request::from_parts(parts, ()), false)
        .map_err(|e| H2Error::UpstreamRequestFailed(e.to_string()))?;

    // 3. Stream request body (client → upstream) in background
    tokio::spawn(async move {
        let mut request_size = 0u64;

        while let Some(chunk_result) = recv_body.data().await {
            match chunk_result {
                Ok(data) => {
                    request_size += data.len() as u64;

                    // Send to upstream with backpressure
                    send_body.reserve_capacity(data.len());

                    if let Err(e) = send_body.send_data(data.clone(), false) {
                        error!(error = ?e, "Failed to send request body chunk");
                        break;
                    }

                    // Release client flow control window
                    if let Err(e) = recv_body.flow_control().release_capacity(data.len()) {
                        error!(error = ?e, "Failed to release client flow control");
                        break;
                    }
                }
                Err(e) => {
                    error!(error = ?e, "Request body read error");
                    break;
                }
            }
        }

        // Signal end of request
        let _ = send_body.send_data(Bytes::new(), true);

        debug!(request_size, "Request body forwarded");
    });

    // 4. Wait for response headers
    let response = response_future
        .await
        .map_err(|e| H2Error::UpstreamResponseFailed(e.to_string()))?;

    let (parts, mut upstream_body) = response.into_parts();
    let status = parts.status.as_u16();

    debug!(status, "Received HTTP/2 response");

    // 5. Send response headers to client
    let mut client_body = client_respond
        .send_response(Response::from_parts(parts, ()), false)
        .map_err(|e| H2Error::ClientResponseFailed(e.to_string()))?;

    // 6. Stream response body (upstream → client) with backpressure
    let mut response_size = 0u64;

    while let Some(chunk_result) = upstream_body.data().await {
        match chunk_result {
            Ok(data) => {
                response_size += data.len() as u64;

                // Wait for client capacity (backpressure)
                client_body.reserve_capacity(data.len());

                // Send to client
                if let Err(e) = client_body.send_data(data.clone(), false) {
                    error!(error = ?e, "Failed to send response body chunk");
                    break;
                }

                // Release upstream flow control window
                if let Err(e) = upstream_body.flow_control().release_capacity(data.len()) {
                    error!(error = ?e, "Failed to release upstream flow control");
                    break;
                }
            }
            Err(e) => {
                error!(error = ?e, "Response body read error");
                break;
            }
        }
    }

    // 7. Signal end of response
    let _ = client_body.send_data(Bytes::new(), true);

    let duration_ms = request_start.elapsed().as_millis() as u64;

    debug!(response_size, duration_ms, "Response body forwarded");

    // 8. Log complete request/response
    if let Some(storage) = log_storage {
        let mut metadata = RequestMetadata {
            timestamp: chrono::Utc::now().timestamp(),
            host: target_host,
            port: target_port,
            method,
            path: path.clone(),
            http_version: "HTTP/2".to_string(),
            status_code: Some(status),
            request_size: 0, // TODO: Track in body forwarding task
            response_size: response_size as usize,
            duration_ms,
            tls_version: Some("TLS 1.3".to_string()),
            mitm_applied: true,
            bypass_reason: None,
        };

        // Apply PII redaction if enabled
        if logging_policy.enable_pii_redaction {
            metadata.path = PiiRedactor::redact(&metadata.path);
        }

        // Fire-and-forget logging
        let storage_clone = Arc::clone(&storage);
        tokio::spawn(async move {
            if let Err(e) = storage_clone.log_request(&metadata).await {
                warn!(error = %e, "Failed to log HTTP/2 request");
            }
        });
    }

    Ok(())
}

/// HTTP/2 MITM errors
#[derive(Debug, Error)]
pub enum H2Error {
    #[error("Client handshake failed: {0}")]
    ClientHandshakeFailed(#[source] h2::Error),

    #[error("Upstream handshake failed: {0}")]
    UpstreamHandshakeFailed(#[source] h2::Error),

    #[error("Accept stream failed: {0}")]
    AcceptFailed(String),

    #[error("Upstream request failed: {0}")]
    UpstreamRequestFailed(String),

    #[error("Upstream response failed: {0}")]
    UpstreamResponseFailed(String),

    #[error("Client response failed: {0}")]
    ClientResponseFailed(String),

    #[error("Stream reset: {0:?}")]
    StreamReset(h2::Reason),
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_http2_config_defaults() {
        let config = Http2Config::default();

        assert_eq!(config.initial_window_size, 65535);
        assert_eq!(config.initial_connection_window_size, 1_048_576);
        assert_eq!(config.max_concurrent_streams, 100);
        assert_eq!(config.max_frame_size, 16384);
        assert_eq!(config.enable_server_push, false);
    }

    #[test]
    fn test_http2_config_custom() {
        let config = Http2Config {
            initial_window_size: 131072, // 128KB
            max_concurrent_streams: 200,
            ..Default::default()
        };

        assert_eq!(config.initial_window_size, 131072);
        assert_eq!(config.max_concurrent_streams, 200);
    }
}