Skip to main content

gephyr_lib/proxy/parity/
capture.rs

1use std::collections::VecDeque;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use reqwest::header::HeaderMap;
8
9use super::ingest::canonicalize_fingerprints;
10use super::types::{
11    BodyShape, ParityCaptureStatus, ParityDiffReport, ParityExportResult, ParityRuleSet,
12    RequestFingerprint, RequestSource,
13};
14
15const DEFAULT_RING_LIMIT: usize = 20_000;
16const DEFAULT_RAW_OUTPUT_DIR: &str = "output/parity/raw";
17const DEFAULT_REDACTED_OUTPUT_DIR: &str = "output/parity/redacted";
18
19#[derive(Debug, Clone)]
20pub struct CaptureStartConfig {
21    pub ring_limit: Option<usize>,
22    pub raw_output_dir: Option<PathBuf>,
23    pub redacted_output_dir: Option<PathBuf>,
24}
25
26impl Default for CaptureStartConfig {
27    fn default() -> Self {
28        Self {
29            ring_limit: Some(DEFAULT_RING_LIMIT),
30            raw_output_dir: Some(PathBuf::from(DEFAULT_RAW_OUTPUT_DIR)),
31            redacted_output_dir: Some(PathBuf::from(DEFAULT_REDACTED_OUTPUT_DIR)),
32        }
33    }
34}
35
36#[derive(Debug)]
37struct ParityManager {
38    enabled: bool,
39    session_id: Option<String>,
40    started_at: Option<chrono::DateTime<chrono::Utc>>,
41    ring_limit: usize,
42    raw_output_dir: PathBuf,
43    redacted_output_dir: PathBuf,
44    capture_buffer: VecDeque<RequestFingerprint>,
45    latest_diff: Option<ParityDiffReport>,
46}
47
48impl Default for ParityManager {
49    fn default() -> Self {
50        Self {
51            enabled: false,
52            session_id: None,
53            started_at: None,
54            ring_limit: DEFAULT_RING_LIMIT,
55            raw_output_dir: PathBuf::from(DEFAULT_RAW_OUTPUT_DIR),
56            redacted_output_dir: PathBuf::from(DEFAULT_REDACTED_OUTPUT_DIR),
57            capture_buffer: VecDeque::new(),
58            latest_diff: None,
59        }
60    }
61}
62
63fn manager() -> &'static Mutex<ParityManager> {
64    static MANAGER: OnceLock<Mutex<ParityManager>> = OnceLock::new();
65    MANAGER.get_or_init(|| Mutex::new(ParityManager::default()))
66}
67
68fn now_timestamp_ms() -> u64 {
69    SystemTime::now()
70        .duration_since(UNIX_EPOCH)
71        .ok()
72        .map(|d| d.as_millis() as u64)
73        .unwrap_or(0)
74}
75
76fn generate_session_id() -> String {
77    format!(
78        "parity-{}-{}",
79        chrono::Utc::now().format("%Y%m%d%H%M%S"),
80        uuid::Uuid::new_v4().simple()
81    )
82}
83
84pub fn start_capture(config: CaptureStartConfig) -> ParityCaptureStatus {
85    let mut lock = manager().lock().expect("parity manager lock poisoned");
86    lock.enabled = true;
87    lock.session_id = Some(generate_session_id());
88    lock.started_at = Some(chrono::Utc::now());
89    lock.ring_limit = config
90        .ring_limit
91        .unwrap_or(DEFAULT_RING_LIMIT)
92        .clamp(1, 1_000_000);
93
94    if let Some(path) = config.raw_output_dir {
95        lock.raw_output_dir = path;
96    }
97    if let Some(path) = config.redacted_output_dir {
98        lock.redacted_output_dir = path;
99    }
100
101    lock.capture_buffer.clear();
102
103    status_from_manager(&lock)
104}
105
106pub fn stop_capture() -> ParityCaptureStatus {
107    let mut lock = manager().lock().expect("parity manager lock poisoned");
108    let should_flush = lock.enabled && !lock.capture_buffer.is_empty();
109    let flush_data = if should_flush {
110        Some((
111            lock.capture_buffer.iter().cloned().collect::<Vec<_>>(),
112            lock.session_id
113                .clone()
114                .unwrap_or_else(|| "parity-no-session".to_string()),
115            lock.raw_output_dir.clone(),
116            lock.redacted_output_dir.clone(),
117        ))
118    } else {
119        None
120    };
121    lock.enabled = false;
122    let status = status_from_manager(&lock);
123    drop(lock);
124
125    if let Some((fingerprints, session_id, raw_dir, redacted_dir)) = flush_data {
126        let raw_output = default_export_path(&raw_dir, &session_id, "raw");
127        let redacted_output = default_export_path(&redacted_dir, &session_id, "redacted");
128        if let Err(e) = write_jsonl_file(&raw_output, &fingerprints) {
129            tracing::warn!(
130                "parity stop_capture raw flush failed ({}): {}",
131                raw_output.display(),
132                e
133            );
134        }
135        let redacted = canonicalize_fingerprints(&fingerprints, &ParityRuleSet::default());
136        if let Err(e) = write_jsonl_file(&redacted_output, &redacted) {
137            tracing::warn!(
138                "parity stop_capture redacted flush failed ({}): {}",
139                redacted_output.display(),
140                e
141            );
142        }
143    }
144
145    status
146}
147
148pub fn clear_capture() {
149    let mut lock = manager().lock().expect("parity manager lock poisoned");
150    lock.capture_buffer.clear();
151}
152
153pub fn capture_status() -> ParityCaptureStatus {
154    let lock = manager().lock().expect("parity manager lock poisoned");
155    status_from_manager(&lock)
156}
157
158fn status_from_manager(manager: &ParityManager) -> ParityCaptureStatus {
159    ParityCaptureStatus {
160        enabled: manager.enabled,
161        session_id: manager.session_id.clone(),
162        started_at: manager.started_at.map(|dt| dt.to_rfc3339()),
163        captured_count: manager.capture_buffer.len(),
164        ring_limit: manager.ring_limit,
165    }
166}
167
168fn normalized_headers(headers: &[(String, String)]) -> Vec<(String, String)> {
169    let mut sorted: Vec<(String, String)> = headers
170        .iter()
171        .map(|(k, v)| (k.to_ascii_lowercase(), v.to_string()))
172        .collect();
173    sorted.sort_by(|a, b| a.0.cmp(&b.0));
174    sorted
175}
176
177pub fn header_map_to_pairs(headers: &HeaderMap) -> Vec<(String, String)> {
178    let mut out = Vec::new();
179    for (name, value) in headers {
180        out.push((
181            name.as_str().to_ascii_lowercase(),
182            value.to_str().unwrap_or("<non-utf8>").to_string(),
183        ));
184    }
185    out.sort_by(|a, b| a.0.cmp(&b.0));
186    out
187}
188
189/// Record outbound request fingerprint if capture is enabled.
190pub fn record_outbound_request(
191    method: &str,
192    url: &str,
193    headers: &[(String, String)],
194    body_json: Option<&serde_json::Value>,
195    latency_ms: Option<u64>,
196    status_code: Option<u16>,
197    source: RequestSource,
198) {
199    let mut lock = manager().lock().expect("parity manager lock poisoned");
200    if !lock.enabled {
201        return;
202    }
203
204    let mut fp = RequestFingerprint::new(
205        source,
206        method.to_ascii_uppercase(),
207        url.to_string(),
208        super::ingest::normalize_endpoint(url),
209        normalized_headers(headers),
210        body_json.map(BodyShape::from_value),
211        Some(now_timestamp_ms()),
212        latency_ms,
213        status_code,
214        lock.session_id.clone(),
215    );
216
217    if fp.normalized_endpoint.is_empty() {
218        fp.normalized_endpoint = super::ingest::normalize_endpoint(&fp.url);
219    }
220
221    if lock.capture_buffer.len() >= lock.ring_limit {
222        let _ = lock.capture_buffer.pop_front();
223    }
224    lock.capture_buffer.push_back(fp);
225}
226
227pub fn record_reqwest_outbound(
228    method: &str,
229    url: &str,
230    headers: &HeaderMap,
231    body_json: Option<&serde_json::Value>,
232    started_at: Instant,
233    status_code: Option<u16>,
234    source: RequestSource,
235) {
236    let latency = started_at.elapsed().as_millis() as u64;
237    let header_pairs = header_map_to_pairs(headers);
238    record_outbound_request(
239        method,
240        url,
241        &header_pairs,
242        body_json,
243        Some(latency),
244        status_code,
245        source,
246    );
247}
248
249pub fn captured_snapshot() -> Vec<RequestFingerprint> {
250    let lock = manager().lock().expect("parity manager lock poisoned");
251    lock.capture_buffer.iter().cloned().collect()
252}
253
254pub fn drain_captured() -> Vec<RequestFingerprint> {
255    let mut lock = manager().lock().expect("parity manager lock poisoned");
256    lock.capture_buffer.drain(..).collect()
257}
258
259pub fn set_latest_diff(report: ParityDiffReport) {
260    let mut lock = manager().lock().expect("parity manager lock poisoned");
261    lock.latest_diff = Some(report);
262}
263
264pub fn clear_latest_diff() {
265    let mut lock = manager().lock().expect("parity manager lock poisoned");
266    lock.latest_diff = None;
267}
268
269pub fn latest_diff() -> Option<ParityDiffReport> {
270    let lock = manager().lock().expect("parity manager lock poisoned");
271    lock.latest_diff.clone()
272}
273
274fn default_export_path(base_dir: &Path, session_id: &str, suffix: &str) -> PathBuf {
275    base_dir.join(format!("{}.{}.jsonl", session_id, suffix))
276}
277
278fn ensure_parent(path: &Path) -> Result<(), String> {
279    if let Some(parent) = path.parent() {
280        std::fs::create_dir_all(parent)
281            .map_err(|e| format!("failed to create {}: {}", parent.display(), e))?;
282    }
283    Ok(())
284}
285
286fn write_jsonl_file(path: &Path, fingerprints: &[RequestFingerprint]) -> Result<(), String> {
287    ensure_parent(path)?;
288    let mut file = std::fs::File::create(path)
289        .map_err(|e| format!("failed to create {}: {}", path.display(), e))?;
290
291    for fp in fingerprints {
292        let line = serde_json::to_string(fp).map_err(|e| format!("serialize error: {}", e))?;
293        writeln!(file, "{}", line).map_err(|e| format!("write error: {}", e))?;
294    }
295
296    Ok(())
297}
298
299pub fn export_dual_artifacts(
300    raw_path: Option<&Path>,
301    redacted_path: Option<&Path>,
302    rules: Option<&ParityRuleSet>,
303) -> Result<ParityExportResult, String> {
304    let lock = manager().lock().expect("parity manager lock poisoned");
305    let fingerprints: Vec<RequestFingerprint> = lock.capture_buffer.iter().cloned().collect();
306    let session_id = lock
307        .session_id
308        .clone()
309        .unwrap_or_else(|| "parity-no-session".to_string());
310
311    let raw_output = raw_path
312        .map(|p| p.to_path_buf())
313        .unwrap_or_else(|| default_export_path(&lock.raw_output_dir, &session_id, "raw"));
314    let redacted_output = redacted_path
315        .map(|p| p.to_path_buf())
316        .unwrap_or_else(|| default_export_path(&lock.redacted_output_dir, &session_id, "redacted"));
317    drop(lock);
318
319    write_jsonl_file(&raw_output, &fingerprints)?;
320
321    let rules = rules.cloned().unwrap_or_default();
322    let canonicalized = canonicalize_fingerprints(&fingerprints, &rules);
323    write_jsonl_file(&redacted_output, &canonicalized)?;
324
325    Ok(ParityExportResult {
326        raw_path: raw_output.display().to_string(),
327        redacted_path: redacted_output.display().to_string(),
328        count: fingerprints.len(),
329        session_id: Some(session_id),
330    })
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::proxy::parity::types::RequestSource;
337    use serde_json::json;
338    use std::sync::{Mutex, MutexGuard, OnceLock};
339
340    fn capture_test_lock() -> MutexGuard<'static, ()> {
341        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
342        LOCK.get_or_init(|| Mutex::new(()))
343            .lock()
344            .expect("capture test lock")
345    }
346
347    #[test]
348    fn capture_is_opt_in_and_respects_ring_limit() {
349        let _guard = capture_test_lock();
350        clear_capture();
351        stop_capture();
352        record_outbound_request(
353            "GET",
354            "https://example.com",
355            &[],
356            None,
357            Some(1),
358            Some(200),
359            RequestSource::Gephyr,
360        );
361        assert!(captured_snapshot().is_empty());
362
363        let mut cfg = CaptureStartConfig::default();
364        cfg.ring_limit = Some(2);
365        let status = start_capture(cfg);
366        assert!(status.enabled);
367
368        for idx in 0..3 {
369            record_outbound_request(
370                "POST",
371                &format!("https://cloudcode-pa.googleapis.com/v1internal:test{}", idx),
372                &[("content-type".to_string(), "application/json".to_string())],
373                Some(&json!({"i": idx})),
374                Some(idx),
375                Some(200),
376                RequestSource::Gephyr,
377            );
378        }
379
380        let snapshot = captured_snapshot();
381        assert_eq!(snapshot.len(), 2);
382        assert!(snapshot[0].url.contains("test1"));
383        assert!(snapshot[1].url.contains("test2"));
384    }
385
386    #[test]
387    fn export_writes_raw_and_redacted_files() {
388        let _guard = capture_test_lock();
389        let _ = start_capture(CaptureStartConfig::default());
390        clear_capture();
391        record_outbound_request(
392            "POST",
393            "https://cloudcode-pa.googleapis.com/v1internal:test",
394            &[("authorization".to_string(), "Bearer test".to_string())],
395            Some(&json!({"x": 1})),
396            Some(12),
397            Some(200),
398            RequestSource::Gephyr,
399        );
400
401        let tmp_dir = std::env::temp_dir().join("gephyr_parity_capture_export");
402        let raw = tmp_dir.join("capture.raw.jsonl");
403        let redacted = tmp_dir.join("capture.redacted.jsonl");
404
405        let result = export_dual_artifacts(Some(&raw), Some(&redacted), None).expect("export");
406        assert_eq!(result.count, 1);
407        assert!(raw.exists());
408        assert!(redacted.exists());
409
410        let redacted_content = std::fs::read_to_string(&redacted).expect("read redacted");
411        assert!(redacted_content.contains("<redacted>"));
412
413        let _ = std::fs::remove_file(raw);
414        let _ = std::fs::remove_file(redacted);
415        let _ = std::fs::remove_dir_all(tmp_dir);
416
417        // Clean up global state to avoid leaking into other tests.
418        stop_capture();
419        clear_capture();
420    }
421}