Skip to main content

trace_weft/
capture.rs

1//! Content capture.
2//!
3//! Turns serialized inputs/outputs into [`BlobRef`]s and persists the bytes to
4//! a [`BlobStore`], gated by the configured [`CapturePolicy`]. The
5//! instrumentation macros and `SpanBuilder` call [`capture_json`] to populate a
6//! span's `input_ref` / `output_ref`.
7//!
8//! Capture is opt-in at the process level: until [`init_capture`] runs (which
9//! `init_local` does for you), [`capture_enabled`] is `false` and nothing is
10//! serialized or stored, so a `MetadataOnly` deployment pays no cost.
11
12use std::sync::{Arc, OnceLock};
13use tokio::sync::OnceCell;
14use trace_weft_core::{
15    BlobHash, BlobRef, BlobStore, CapturePolicy, RedactionResult, RedactionStatus, Redactor,
16    redactor::{ArcRedactor, RegexRedactor},
17};
18
19const PREVIEW_MAX_BYTES: usize = 512;
20
21/// Process-wide capture configuration.
22pub struct CaptureConfig {
23    pub policy: CapturePolicy,
24    pub blobs: Arc<dyn BlobStore>,
25    pub redactor: ArcRedactor,
26    /// Recorded verbatim into `BlobRef::storage_backend` (e.g. `"local_fs"`).
27    pub storage_backend: String,
28}
29
30static CAPTURE: OnceCell<CaptureConfig> = OnceCell::const_new();
31static FALLBACK_REDACTOR: OnceLock<RegexRedactor> = OnceLock::new();
32
33/// Install the process-wide capture configuration. Errors if already set.
34pub fn init_capture(config: CaptureConfig) -> anyhow::Result<()> {
35    CAPTURE
36        .set(config)
37        .map_err(|_| anyhow::anyhow!("Capture already initialized"))?;
38    Ok(())
39}
40
41/// Whether content capture is active. `false` when capture is uninitialized or
42/// the policy is `MetadataOnly`; callers should skip serialization entirely in
43/// that case.
44pub fn capture_enabled() -> bool {
45    CAPTURE
46        .get()
47        .is_some_and(|c| !matches!(c.policy, CapturePolicy::MetadataOnly))
48}
49
50/// The active process-wide capture policy, or [`CapturePolicy::MetadataOnly`]
51/// when capture has not been initialized.
52pub fn capture_policy() -> CapturePolicy {
53    CAPTURE
54        .get()
55        .map(|c| c.policy)
56        .unwrap_or(CapturePolicy::MetadataOnly)
57}
58
59/// Redact text with the configured redactor, falling back to the default regex
60/// redactor even when content capture has not been initialized.
61///
62/// This keeps metadata-only traces from leaking secrets through error strings
63/// or status messages while still avoiding input/output blob capture.
64pub fn redact_text(input: &str) -> RedactionResult {
65    if let Some(cfg) = CAPTURE.get() {
66        return cfg.redactor.redact(input);
67    }
68
69    FALLBACK_REDACTOR
70        .get_or_init(RegexRedactor::default)
71        .redact(input)
72}
73
74/// Serialize already-built JSON content into a stored blob and return a
75/// [`BlobRef`] describing it, honoring the configured policy. Returns `None`
76/// when capture is disabled.
77pub async fn capture_json(content_type: &str, value: serde_json::Value) -> Option<BlobRef> {
78    let cfg = CAPTURE.get()?;
79    if matches!(cfg.policy, CapturePolicy::MetadataOnly) {
80        return None;
81    }
82
83    let raw = serde_json::to_vec(&value).ok()?;
84    let (stored_bytes, redaction_status, preview_text) =
85        capture_parts(cfg.policy, &raw, cfg.redactor.as_ref())?;
86
87    let hash = BlobHash(sha256_hex(&stored_bytes));
88    let size_bytes = stored_bytes.len() as u64;
89
90    // A failed blob write must not lose the span; record the ref regardless.
91    if let Err(err) = cfg.blobs.put_blob(&hash, content_type, &stored_bytes).await {
92        tracing::warn!(error = %err, "failed to persist captured blob");
93    }
94
95    Some(BlobRef {
96        hash,
97        content_type: content_type.to_string(),
98        size_bytes,
99        created_at_timestamp: now_ms(),
100        redaction_status,
101        encryption_status: "none".to_string(),
102        storage_backend: cfg.storage_backend.clone(),
103        preview_text_redacted: Some(preview_text),
104    })
105}
106
107fn capture_parts(
108    policy: CapturePolicy,
109    raw: &[u8],
110    redactor: &dyn Redactor,
111) -> Option<(Vec<u8>, RedactionStatus, String)> {
112    let raw_text = String::from_utf8_lossy(raw);
113    let redacted = redactor.redact(&raw_text);
114
115    match policy {
116        CapturePolicy::RedactedPreview => {
117            let preview = preview(&redacted.redacted_text);
118            Some((
119                redacted.redacted_text.into_bytes(),
120                redacted.status,
121                preview,
122            ))
123        }
124        CapturePolicy::FullContentLocalOnly | CapturePolicy::FullContentExportable => {
125            let preview = preview(&redacted.redacted_text);
126            Some((raw.to_vec(), RedactionStatus::Unredacted, preview))
127        }
128        CapturePolicy::MetadataOnly => None,
129    }
130}
131
132fn sha256_hex(bytes: &[u8]) -> String {
133    use sha2::{Digest, Sha256};
134    let mut hasher = Sha256::new();
135    hasher.update(bytes);
136    format!("sha256:{:x}", hasher.finalize())
137}
138
139fn preview(text: &str) -> String {
140    if text.len() <= PREVIEW_MAX_BYTES {
141        return text.to_string();
142    }
143    let mut end = PREVIEW_MAX_BYTES;
144    while !text.is_char_boundary(end) {
145        end -= 1;
146    }
147    format!("{}…", &text[..end])
148}
149
150fn now_ms() -> u64 {
151    std::time::SystemTime::now()
152        .duration_since(std::time::UNIX_EPOCH)
153        .unwrap_or_default()
154        .as_millis() as u64
155}
156
157/// Filesystem blob store: writes each blob to `dir/<hash>`. Used by
158/// `init_local`.
159pub struct FsBlobStore {
160    dir: std::path::PathBuf,
161}
162
163impl FsBlobStore {
164    pub fn new(dir: impl Into<std::path::PathBuf>) -> Self {
165        Self { dir: dir.into() }
166    }
167}
168
169#[async_trait::async_trait]
170impl BlobStore for FsBlobStore {
171    async fn put_blob(
172        &self,
173        hash: &BlobHash,
174        _content_type: &str,
175        content: &[u8],
176    ) -> anyhow::Result<()> {
177        tokio::fs::create_dir_all(&self.dir).await?;
178        // Hashes are prefixed (`sha256:`); ':' is not portable in filenames.
179        let path = self.dir.join(hash.0.replace(':', "_"));
180        tokio::fs::write(path, content).await?;
181        Ok(())
182    }
183
184    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
185        let path = self.dir.join(hash.0.replace(':', "_"));
186        match tokio::fs::read(path).await {
187            Ok(bytes) => Ok(Some(bytes)),
188            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
189            Err(e) => Err(e.into()),
190        }
191    }
192}
193
194/// In-memory blob store, handy for tests and ephemeral runs.
195#[derive(Clone, Default)]
196pub struct MemoryBlobStore {
197    blobs: Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
198}
199
200impl MemoryBlobStore {
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    pub fn len(&self) -> usize {
206        self.blobs.lock().unwrap().len()
207    }
208
209    pub fn is_empty(&self) -> bool {
210        self.len() == 0
211    }
212}
213
214#[async_trait::async_trait]
215impl BlobStore for MemoryBlobStore {
216    async fn put_blob(
217        &self,
218        hash: &BlobHash,
219        _content_type: &str,
220        content: &[u8],
221    ) -> anyhow::Result<()> {
222        self.blobs
223            .lock()
224            .unwrap()
225            .insert(hash.0.clone(), content.to_vec());
226        Ok(())
227    }
228
229    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
230        Ok(self.blobs.lock().unwrap().get(&hash.0).cloned())
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn fallback_redacts_text_without_capture_init() {
240        let result = redact_text("failed with Bearer abc.DEF-123~xyz");
241        assert_eq!(result.redacted_text, "failed with [REDACTED]");
242        assert_eq!(result.status, RedactionStatus::Redacted);
243    }
244
245    #[test]
246    fn redacted_preview_stores_only_redacted_bytes() {
247        let redactor = RegexRedactor::default();
248        let (stored, status, preview) = capture_parts(
249            CapturePolicy::RedactedPreview,
250            br#"{"email":"dev@example.com"}"#,
251            &redactor,
252        )
253        .expect("capture enabled");
254
255        assert_eq!(status, RedactionStatus::Redacted);
256        assert_eq!(
257            String::from_utf8(stored).unwrap(),
258            r#"{"email":"[REDACTED]"}"#
259        );
260        assert_eq!(preview, r#"{"email":"[REDACTED]"}"#);
261    }
262
263    #[test]
264    fn full_content_keeps_raw_blob_but_redacts_preview() {
265        let redactor = RegexRedactor::default();
266        let raw = br#"{"email":"dev@example.com"}"#;
267        let (stored, status, preview) =
268            capture_parts(CapturePolicy::FullContentLocalOnly, raw, &redactor)
269                .expect("capture enabled");
270
271        assert_eq!(status, RedactionStatus::Unredacted);
272        assert_eq!(stored, raw);
273        assert_eq!(preview, r#"{"email":"[REDACTED]"}"#);
274    }
275
276    #[test]
277    fn metadata_only_captures_nothing() {
278        let redactor = RegexRedactor::default();
279        assert!(capture_parts(CapturePolicy::MetadataOnly, b"secret", &redactor).is_none());
280    }
281}