Skip to main content

osproxy_capture/
lib.rs

1//! Full-fidelity traffic capture (tenant-agnostic).
2//!
3//! A capture proxy forwards each request to the upstream while recording the raw
4//! request and response to a durable stream, so a replayer can later apply that
5//! stream to another cluster (the OpenSearch Migration Assistant capture proxy).
6//! This crate is the seam: [`Capture`] receives each exchange; a queue writer
7//! (the `osproxy-kafka` crate), a file recorder, or the in-memory
8//! [`MemoryCapture`] implement it. It pulls in no broker dependency, so the seam
9//! is implementable from a leaf crate without dragging a Kafka client into the
10//! default build.
11//!
12//! **This is not the shape-only telemetry.** Everything else osproxy records is
13//! shapes/ids/names and safe to expose by construction. A capture record carries
14//! the raw bodies and header *values*, tenant data, and any credential a
15//! redaction layer did not strip. The capture stream is a privileged channel:
16//! secure it (encryption, access control), and enable it deliberately, never by
17//! default. Redaction is composed in via [`RedactingCapture`] rather than baked
18//! into every recorder.
19#![deny(missing_docs)]
20
21use std::sync::{Arc, Mutex};
22
23pub use osproxy_spi::HttpMethod;
24
25/// One captured request/response exchange, borrowed for the duration of the
26/// [`Capture::capture`] call. Full fidelity: bodies and header values included.
27#[derive(Clone, Copy, Debug)]
28pub struct CaptureRecord<'a> {
29    /// The proxy's correlation id for the exchange.
30    pub request_id: &'a str,
31    /// The client request method.
32    pub method: HttpMethod,
33    /// The request path (e.g. `/orders/_doc/1`).
34    pub path: &'a str,
35    /// The request query string without the `?`, if any.
36    pub query: Option<&'a str>,
37    /// The request headers (subject to any composed redaction).
38    pub headers: &'a [(String, String)],
39    /// The raw request body.
40    pub body: &'a [u8],
41    /// The status the proxy returned to the client.
42    pub response_status: u16,
43    /// The raw response body.
44    pub response_body: &'a [u8],
45}
46
47/// Receives one record per forwarded request. The recorder a capture proxy
48/// provides: a queue writer implements this.
49///
50/// Implementations MUST NOT panic. `capture` is called inline after the response
51/// is produced, so heavy delivery (a network write) belongs behind a background
52/// queue; do the minimum here.
53pub trait Capture: Send + Sync {
54    /// Whether this capture will record. The handler checks it before assembling
55    /// a record, so a disabled capture costs only this call.
56    fn enabled(&self) -> bool {
57        true
58    }
59
60    /// Records one exchange.
61    fn capture(&self, record: &CaptureRecord<'_>);
62}
63
64/// The default: capture off. No record is assembled.
65#[derive(Clone, Copy, Debug, Default)]
66pub struct NoCapture;
67
68impl Capture for NoCapture {
69    fn enabled(&self) -> bool {
70        false
71    }
72    fn capture(&self, _record: &CaptureRecord<'_>) {}
73}
74
75/// The header list with any `Authorization` header removed (case-insensitive).
76#[must_use]
77pub fn without_authorization(headers: &[(String, String)]) -> Vec<(String, String)> {
78    headers
79        .iter()
80        .filter(|(name, _)| !name.eq_ignore_ascii_case("authorization"))
81        .cloned()
82        .collect()
83}
84
85/// Strips the `Authorization` header before the wrapped capture sees the record.
86/// Composition: redaction is a layer over any recorder, not a feature each
87/// recorder reimplements.
88///
89/// This removes the credential only. The bodies remain full-fidelity (a replay
90/// needs them); securing the stream itself is the operator's responsibility.
91#[derive(Clone, Copy, Debug, Default)]
92pub struct RedactingCapture<C> {
93    inner: C,
94}
95
96impl<C> RedactingCapture<C> {
97    /// Wraps `inner` with `Authorization`-header redaction.
98    pub fn new(inner: C) -> Self {
99        Self { inner }
100    }
101}
102
103impl<C: Capture> Capture for RedactingCapture<C> {
104    fn enabled(&self) -> bool {
105        self.inner.enabled()
106    }
107    fn capture(&self, record: &CaptureRecord<'_>) {
108        let safe_headers = without_authorization(record.headers);
109        let redacted = CaptureRecord {
110            headers: &safe_headers,
111            ..*record
112        };
113        self.inner.capture(&redacted);
114    }
115}
116
117/// An owned copy of a captured exchange, for the in-memory reference recorder.
118#[derive(Clone, Debug, PartialEq, Eq)]
119pub struct OwnedCapture {
120    /// See [`CaptureRecord::request_id`].
121    pub request_id: String,
122    /// See [`CaptureRecord::method`].
123    pub method: HttpMethod,
124    /// See [`CaptureRecord::path`].
125    pub path: String,
126    /// See [`CaptureRecord::query`].
127    pub query: Option<String>,
128    /// See [`CaptureRecord::headers`].
129    pub headers: Vec<(String, String)>,
130    /// See [`CaptureRecord::body`].
131    pub body: Vec<u8>,
132    /// See [`CaptureRecord::response_status`].
133    pub response_status: u16,
134    /// See [`CaptureRecord::response_body`].
135    pub response_body: Vec<u8>,
136}
137
138impl OwnedCapture {
139    /// Copies a borrowed record into an owned one.
140    #[must_use]
141    pub fn from_record(record: &CaptureRecord<'_>) -> Self {
142        Self {
143            request_id: record.request_id.to_owned(),
144            method: record.method,
145            path: record.path.to_owned(),
146            query: record.query.map(str::to_owned),
147            headers: record.headers.to_vec(),
148            body: record.body.to_vec(),
149            response_status: record.response_status,
150            response_body: record.response_body.to_vec(),
151        }
152    }
153}
154
155/// A reference [`Capture`] that keeps exchanges in memory. For tests and local
156/// inspection; a real deployment writes to a durable queue.
157#[derive(Clone, Default, Debug)]
158pub struct MemoryCapture {
159    records: Arc<Mutex<Vec<OwnedCapture>>>,
160}
161
162impl MemoryCapture {
163    /// An empty recorder.
164    #[must_use]
165    pub fn new() -> Self {
166        Self::default()
167    }
168
169    /// A snapshot of everything captured so far, oldest first.
170    #[must_use]
171    pub fn records(&self) -> Vec<OwnedCapture> {
172        self.records
173            .lock()
174            .unwrap_or_else(std::sync::PoisonError::into_inner)
175            .clone()
176    }
177}
178
179impl Capture for MemoryCapture {
180    fn capture(&self, record: &CaptureRecord<'_>) {
181        self.records
182            .lock()
183            .unwrap_or_else(std::sync::PoisonError::into_inner)
184            .push(OwnedCapture::from_record(record));
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    fn record(headers: &[(String, String)]) -> CaptureRecord<'_> {
193        CaptureRecord {
194            request_id: "r1",
195            method: HttpMethod::Post,
196            path: "/orders/_doc",
197            query: None,
198            headers,
199            body: br#"{"tenant_id":"acme"}"#,
200            response_status: 201,
201            response_body: b"{}",
202        }
203    }
204
205    #[test]
206    fn the_default_capture_is_off() {
207        assert!(!NoCapture.enabled());
208        NoCapture.capture(&record(&[])); // no panic
209    }
210
211    #[test]
212    fn memory_capture_keeps_full_fidelity_records() {
213        let cap = MemoryCapture::new();
214        let headers = vec![("content-type".to_owned(), "application/json".to_owned())];
215        cap.capture(&record(&headers));
216        let got = cap.records();
217        assert_eq!(got.len(), 1);
218        assert_eq!(got[0].path, "/orders/_doc");
219        assert_eq!(got[0].body, br#"{"tenant_id":"acme"}"#);
220        assert_eq!(got[0].response_status, 201);
221    }
222
223    #[test]
224    fn redacting_capture_strips_only_the_authorization_header() {
225        let inner = MemoryCapture::new();
226        let cap = RedactingCapture::new(inner.clone());
227        let headers = vec![
228            ("Authorization".to_owned(), "Bearer s3cret".to_owned()),
229            ("x-tenant".to_owned(), "acme".to_owned()),
230        ];
231        cap.capture(&record(&headers));
232        let got = inner.records();
233        assert_eq!(got.len(), 1);
234        assert!(
235            !got[0]
236                .headers
237                .iter()
238                .any(|(k, _)| k.eq_ignore_ascii_case("authorization")),
239            "credential redacted: {:?}",
240            got[0].headers
241        );
242        // The body is still full fidelity (a replay needs it).
243        assert_eq!(got[0].body, br#"{"tenant_id":"acme"}"#);
244        assert!(got[0].headers.iter().any(|(k, _)| k == "x-tenant"));
245    }
246}