1use std::collections::VecDeque;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use reqwest::header::HeaderMap;
8
9use super::ingest::canonicalize_fingerprints;
10use super::types::{
11 BodyShape, ParityCaptureStatus, ParityDiffReport, ParityExportResult, ParityRuleSet,
12 RequestFingerprint, RequestSource,
13};
14
15const DEFAULT_RING_LIMIT: usize = 20_000;
16const DEFAULT_RAW_OUTPUT_DIR: &str = "output/parity/raw";
17const DEFAULT_REDACTED_OUTPUT_DIR: &str = "output/parity/redacted";
18
19#[derive(Debug, Clone)]
20pub struct CaptureStartConfig {
21 pub ring_limit: Option<usize>,
22 pub raw_output_dir: Option<PathBuf>,
23 pub redacted_output_dir: Option<PathBuf>,
24}
25
26impl Default for CaptureStartConfig {
27 fn default() -> Self {
28 Self {
29 ring_limit: Some(DEFAULT_RING_LIMIT),
30 raw_output_dir: Some(PathBuf::from(DEFAULT_RAW_OUTPUT_DIR)),
31 redacted_output_dir: Some(PathBuf::from(DEFAULT_REDACTED_OUTPUT_DIR)),
32 }
33 }
34}
35
36#[derive(Debug)]
37struct ParityManager {
38 enabled: bool,
39 session_id: Option<String>,
40 started_at: Option<chrono::DateTime<chrono::Utc>>,
41 ring_limit: usize,
42 raw_output_dir: PathBuf,
43 redacted_output_dir: PathBuf,
44 capture_buffer: VecDeque<RequestFingerprint>,
45 latest_diff: Option<ParityDiffReport>,
46}
47
48impl Default for ParityManager {
49 fn default() -> Self {
50 Self {
51 enabled: false,
52 session_id: None,
53 started_at: None,
54 ring_limit: DEFAULT_RING_LIMIT,
55 raw_output_dir: PathBuf::from(DEFAULT_RAW_OUTPUT_DIR),
56 redacted_output_dir: PathBuf::from(DEFAULT_REDACTED_OUTPUT_DIR),
57 capture_buffer: VecDeque::new(),
58 latest_diff: None,
59 }
60 }
61}
62
63fn manager() -> &'static Mutex<ParityManager> {
64 static MANAGER: OnceLock<Mutex<ParityManager>> = OnceLock::new();
65 MANAGER.get_or_init(|| Mutex::new(ParityManager::default()))
66}
67
68fn now_timestamp_ms() -> u64 {
69 SystemTime::now()
70 .duration_since(UNIX_EPOCH)
71 .ok()
72 .map(|d| d.as_millis() as u64)
73 .unwrap_or(0)
74}
75
76fn generate_session_id() -> String {
77 format!(
78 "parity-{}-{}",
79 chrono::Utc::now().format("%Y%m%d%H%M%S"),
80 uuid::Uuid::new_v4().simple()
81 )
82}
83
84pub fn start_capture(config: CaptureStartConfig) -> ParityCaptureStatus {
85 let mut lock = manager().lock().expect("parity manager lock poisoned");
86 lock.enabled = true;
87 lock.session_id = Some(generate_session_id());
88 lock.started_at = Some(chrono::Utc::now());
89 lock.ring_limit = config
90 .ring_limit
91 .unwrap_or(DEFAULT_RING_LIMIT)
92 .clamp(1, 1_000_000);
93
94 if let Some(path) = config.raw_output_dir {
95 lock.raw_output_dir = path;
96 }
97 if let Some(path) = config.redacted_output_dir {
98 lock.redacted_output_dir = path;
99 }
100
101 lock.capture_buffer.clear();
102
103 status_from_manager(&lock)
104}
105
106pub fn stop_capture() -> ParityCaptureStatus {
107 let mut lock = manager().lock().expect("parity manager lock poisoned");
108 let should_flush = lock.enabled && !lock.capture_buffer.is_empty();
109 let flush_data = if should_flush {
110 Some((
111 lock.capture_buffer.iter().cloned().collect::<Vec<_>>(),
112 lock.session_id
113 .clone()
114 .unwrap_or_else(|| "parity-no-session".to_string()),
115 lock.raw_output_dir.clone(),
116 lock.redacted_output_dir.clone(),
117 ))
118 } else {
119 None
120 };
121 lock.enabled = false;
122 let status = status_from_manager(&lock);
123 drop(lock);
124
125 if let Some((fingerprints, session_id, raw_dir, redacted_dir)) = flush_data {
126 let raw_output = default_export_path(&raw_dir, &session_id, "raw");
127 let redacted_output = default_export_path(&redacted_dir, &session_id, "redacted");
128 if let Err(e) = write_jsonl_file(&raw_output, &fingerprints) {
129 tracing::warn!(
130 "parity stop_capture raw flush failed ({}): {}",
131 raw_output.display(),
132 e
133 );
134 }
135 let redacted = canonicalize_fingerprints(&fingerprints, &ParityRuleSet::default());
136 if let Err(e) = write_jsonl_file(&redacted_output, &redacted) {
137 tracing::warn!(
138 "parity stop_capture redacted flush failed ({}): {}",
139 redacted_output.display(),
140 e
141 );
142 }
143 }
144
145 status
146}
147
148pub fn clear_capture() {
149 let mut lock = manager().lock().expect("parity manager lock poisoned");
150 lock.capture_buffer.clear();
151}
152
153pub fn capture_status() -> ParityCaptureStatus {
154 let lock = manager().lock().expect("parity manager lock poisoned");
155 status_from_manager(&lock)
156}
157
158fn status_from_manager(manager: &ParityManager) -> ParityCaptureStatus {
159 ParityCaptureStatus {
160 enabled: manager.enabled,
161 session_id: manager.session_id.clone(),
162 started_at: manager.started_at.map(|dt| dt.to_rfc3339()),
163 captured_count: manager.capture_buffer.len(),
164 ring_limit: manager.ring_limit,
165 }
166}
167
168fn normalized_headers(headers: &[(String, String)]) -> Vec<(String, String)> {
169 let mut sorted: Vec<(String, String)> = headers
170 .iter()
171 .map(|(k, v)| (k.to_ascii_lowercase(), v.to_string()))
172 .collect();
173 sorted.sort_by(|a, b| a.0.cmp(&b.0));
174 sorted
175}
176
177pub fn header_map_to_pairs(headers: &HeaderMap) -> Vec<(String, String)> {
178 let mut out = Vec::new();
179 for (name, value) in headers {
180 out.push((
181 name.as_str().to_ascii_lowercase(),
182 value.to_str().unwrap_or("<non-utf8>").to_string(),
183 ));
184 }
185 out.sort_by(|a, b| a.0.cmp(&b.0));
186 out
187}
188
189pub fn record_outbound_request(
191 method: &str,
192 url: &str,
193 headers: &[(String, String)],
194 body_json: Option<&serde_json::Value>,
195 latency_ms: Option<u64>,
196 status_code: Option<u16>,
197 source: RequestSource,
198) {
199 let mut lock = manager().lock().expect("parity manager lock poisoned");
200 if !lock.enabled {
201 return;
202 }
203
204 let mut fp = RequestFingerprint::new(
205 source,
206 method.to_ascii_uppercase(),
207 url.to_string(),
208 super::ingest::normalize_endpoint(url),
209 normalized_headers(headers),
210 body_json.map(BodyShape::from_value),
211 Some(now_timestamp_ms()),
212 latency_ms,
213 status_code,
214 lock.session_id.clone(),
215 );
216
217 if fp.normalized_endpoint.is_empty() {
218 fp.normalized_endpoint = super::ingest::normalize_endpoint(&fp.url);
219 }
220
221 if lock.capture_buffer.len() >= lock.ring_limit {
222 let _ = lock.capture_buffer.pop_front();
223 }
224 lock.capture_buffer.push_back(fp);
225}
226
227pub fn record_reqwest_outbound(
228 method: &str,
229 url: &str,
230 headers: &HeaderMap,
231 body_json: Option<&serde_json::Value>,
232 started_at: Instant,
233 status_code: Option<u16>,
234 source: RequestSource,
235) {
236 let latency = started_at.elapsed().as_millis() as u64;
237 let header_pairs = header_map_to_pairs(headers);
238 record_outbound_request(
239 method,
240 url,
241 &header_pairs,
242 body_json,
243 Some(latency),
244 status_code,
245 source,
246 );
247}
248
249pub fn captured_snapshot() -> Vec<RequestFingerprint> {
250 let lock = manager().lock().expect("parity manager lock poisoned");
251 lock.capture_buffer.iter().cloned().collect()
252}
253
254pub fn drain_captured() -> Vec<RequestFingerprint> {
255 let mut lock = manager().lock().expect("parity manager lock poisoned");
256 lock.capture_buffer.drain(..).collect()
257}
258
259pub fn set_latest_diff(report: ParityDiffReport) {
260 let mut lock = manager().lock().expect("parity manager lock poisoned");
261 lock.latest_diff = Some(report);
262}
263
264pub fn clear_latest_diff() {
265 let mut lock = manager().lock().expect("parity manager lock poisoned");
266 lock.latest_diff = None;
267}
268
269pub fn latest_diff() -> Option<ParityDiffReport> {
270 let lock = manager().lock().expect("parity manager lock poisoned");
271 lock.latest_diff.clone()
272}
273
274fn default_export_path(base_dir: &Path, session_id: &str, suffix: &str) -> PathBuf {
275 base_dir.join(format!("{}.{}.jsonl", session_id, suffix))
276}
277
278fn ensure_parent(path: &Path) -> Result<(), String> {
279 if let Some(parent) = path.parent() {
280 std::fs::create_dir_all(parent)
281 .map_err(|e| format!("failed to create {}: {}", parent.display(), e))?;
282 }
283 Ok(())
284}
285
286fn write_jsonl_file(path: &Path, fingerprints: &[RequestFingerprint]) -> Result<(), String> {
287 ensure_parent(path)?;
288 let mut file = std::fs::File::create(path)
289 .map_err(|e| format!("failed to create {}: {}", path.display(), e))?;
290
291 for fp in fingerprints {
292 let line = serde_json::to_string(fp).map_err(|e| format!("serialize error: {}", e))?;
293 writeln!(file, "{}", line).map_err(|e| format!("write error: {}", e))?;
294 }
295
296 Ok(())
297}
298
299pub fn export_dual_artifacts(
300 raw_path: Option<&Path>,
301 redacted_path: Option<&Path>,
302 rules: Option<&ParityRuleSet>,
303) -> Result<ParityExportResult, String> {
304 let lock = manager().lock().expect("parity manager lock poisoned");
305 let fingerprints: Vec<RequestFingerprint> = lock.capture_buffer.iter().cloned().collect();
306 let session_id = lock
307 .session_id
308 .clone()
309 .unwrap_or_else(|| "parity-no-session".to_string());
310
311 let raw_output = raw_path
312 .map(|p| p.to_path_buf())
313 .unwrap_or_else(|| default_export_path(&lock.raw_output_dir, &session_id, "raw"));
314 let redacted_output = redacted_path
315 .map(|p| p.to_path_buf())
316 .unwrap_or_else(|| default_export_path(&lock.redacted_output_dir, &session_id, "redacted"));
317 drop(lock);
318
319 write_jsonl_file(&raw_output, &fingerprints)?;
320
321 let rules = rules.cloned().unwrap_or_default();
322 let canonicalized = canonicalize_fingerprints(&fingerprints, &rules);
323 write_jsonl_file(&redacted_output, &canonicalized)?;
324
325 Ok(ParityExportResult {
326 raw_path: raw_output.display().to_string(),
327 redacted_path: redacted_output.display().to_string(),
328 count: fingerprints.len(),
329 session_id: Some(session_id),
330 })
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::proxy::parity::types::RequestSource;
337 use serde_json::json;
338 use std::sync::{Mutex, MutexGuard, OnceLock};
339
340 fn capture_test_lock() -> MutexGuard<'static, ()> {
341 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
342 LOCK.get_or_init(|| Mutex::new(()))
343 .lock()
344 .expect("capture test lock")
345 }
346
347 #[test]
348 fn capture_is_opt_in_and_respects_ring_limit() {
349 let _guard = capture_test_lock();
350 clear_capture();
351 stop_capture();
352 record_outbound_request(
353 "GET",
354 "https://example.com",
355 &[],
356 None,
357 Some(1),
358 Some(200),
359 RequestSource::Gephyr,
360 );
361 assert!(captured_snapshot().is_empty());
362
363 let mut cfg = CaptureStartConfig::default();
364 cfg.ring_limit = Some(2);
365 let status = start_capture(cfg);
366 assert!(status.enabled);
367
368 for idx in 0..3 {
369 record_outbound_request(
370 "POST",
371 &format!("https://cloudcode-pa.googleapis.com/v1internal:test{}", idx),
372 &[("content-type".to_string(), "application/json".to_string())],
373 Some(&json!({"i": idx})),
374 Some(idx),
375 Some(200),
376 RequestSource::Gephyr,
377 );
378 }
379
380 let snapshot = captured_snapshot();
381 assert_eq!(snapshot.len(), 2);
382 assert!(snapshot[0].url.contains("test1"));
383 assert!(snapshot[1].url.contains("test2"));
384 }
385
386 #[test]
387 fn export_writes_raw_and_redacted_files() {
388 let _guard = capture_test_lock();
389 let _ = start_capture(CaptureStartConfig::default());
390 clear_capture();
391 record_outbound_request(
392 "POST",
393 "https://cloudcode-pa.googleapis.com/v1internal:test",
394 &[("authorization".to_string(), "Bearer test".to_string())],
395 Some(&json!({"x": 1})),
396 Some(12),
397 Some(200),
398 RequestSource::Gephyr,
399 );
400
401 let tmp_dir = std::env::temp_dir().join("gephyr_parity_capture_export");
402 let raw = tmp_dir.join("capture.raw.jsonl");
403 let redacted = tmp_dir.join("capture.redacted.jsonl");
404
405 let result = export_dual_artifacts(Some(&raw), Some(&redacted), None).expect("export");
406 assert_eq!(result.count, 1);
407 assert!(raw.exists());
408 assert!(redacted.exists());
409
410 let redacted_content = std::fs::read_to_string(&redacted).expect("read redacted");
411 assert!(redacted_content.contains("<redacted>"));
412
413 let _ = std::fs::remove_file(raw);
414 let _ = std::fs::remove_file(redacted);
415 let _ = std::fs::remove_dir_all(tmp_dir);
416
417 stop_capture();
419 clear_capture();
420 }
421}