Skip to main content

trace_weft/
capture.rs

1//! Content capture.
2//!
3//! Turns serialized inputs/outputs into [`BlobRef`]s and persists the bytes to
4//! a [`BlobStore`], gated by the configured [`CapturePolicy`]. The
5//! instrumentation macros and `SpanBuilder` call [`capture_json`] to populate a
6//! span's `input_ref` / `output_ref`.
7//!
8//! Capture is opt-in at the process level: until [`init_capture`] runs (which
9//! `init_local` does for you), [`capture_enabled`] is `false` and nothing is
10//! serialized or stored, so a `MetadataOnly` deployment pays no cost.
11
12use std::sync::Arc;
13use tokio::sync::OnceCell;
14use trace_weft_core::{
15    BlobHash, BlobRef, BlobStore, CapturePolicy, RedactionStatus, redactor::ArcRedactor,
16};
17
18const PREVIEW_MAX_BYTES: usize = 512;
19
20/// Process-wide capture configuration.
21pub struct CaptureConfig {
22    pub policy: CapturePolicy,
23    pub blobs: Arc<dyn BlobStore>,
24    pub redactor: ArcRedactor,
25    /// Recorded verbatim into `BlobRef::storage_backend` (e.g. `"local_fs"`).
26    pub storage_backend: String,
27}
28
29static CAPTURE: OnceCell<CaptureConfig> = OnceCell::const_new();
30
31/// Install the process-wide capture configuration. Errors if already set.
32pub fn init_capture(config: CaptureConfig) -> anyhow::Result<()> {
33    CAPTURE
34        .set(config)
35        .map_err(|_| anyhow::anyhow!("Capture already initialized"))?;
36    Ok(())
37}
38
39/// Whether content capture is active. `false` when capture is uninitialized or
40/// the policy is `MetadataOnly`; callers should skip serialization entirely in
41/// that case.
42pub fn capture_enabled() -> bool {
43    CAPTURE
44        .get()
45        .is_some_and(|c| !matches!(c.policy, CapturePolicy::MetadataOnly))
46}
47
48/// Serialize already-built JSON content into a stored blob and return a
49/// [`BlobRef`] describing it, honoring the configured policy. Returns `None`
50/// when capture is disabled.
51pub async fn capture_json(content_type: &str, value: serde_json::Value) -> Option<BlobRef> {
52    let cfg = CAPTURE.get()?;
53    if matches!(cfg.policy, CapturePolicy::MetadataOnly) {
54        return None;
55    }
56
57    let raw = serde_json::to_vec(&value).ok()?;
58    let raw_text = String::from_utf8_lossy(&raw);
59
60    let (stored_bytes, redaction_status, preview_text) = match cfg.policy {
61        CapturePolicy::RedactedPreview => {
62            let result = cfg.redactor.redact(&raw_text);
63            let preview = preview(&result.redacted_text);
64            (result.redacted_text.into_bytes(), result.status, preview)
65        }
66        CapturePolicy::FullContentLocalOnly | CapturePolicy::FullContentExportable => {
67            let preview = preview(&raw_text);
68            (raw.clone(), RedactionStatus::Unredacted, preview)
69        }
70        CapturePolicy::MetadataOnly => return None,
71    };
72
73    let hash = BlobHash(sha256_hex(&stored_bytes));
74    let size_bytes = stored_bytes.len() as u64;
75
76    // A failed blob write must not lose the span; record the ref regardless.
77    if let Err(err) = cfg.blobs.put_blob(&hash, content_type, &stored_bytes).await {
78        tracing::warn!(error = %err, "failed to persist captured blob");
79    }
80
81    Some(BlobRef {
82        hash,
83        content_type: content_type.to_string(),
84        size_bytes,
85        created_at_timestamp: now_ms(),
86        redaction_status,
87        encryption_status: "none".to_string(),
88        storage_backend: cfg.storage_backend.clone(),
89        preview_text_redacted: Some(preview_text),
90    })
91}
92
93fn sha256_hex(bytes: &[u8]) -> String {
94    use sha2::{Digest, Sha256};
95    let mut hasher = Sha256::new();
96    hasher.update(bytes);
97    format!("sha256:{:x}", hasher.finalize())
98}
99
100fn preview(text: &str) -> String {
101    if text.len() <= PREVIEW_MAX_BYTES {
102        return text.to_string();
103    }
104    let mut end = PREVIEW_MAX_BYTES;
105    while !text.is_char_boundary(end) {
106        end -= 1;
107    }
108    format!("{}…", &text[..end])
109}
110
111fn now_ms() -> u64 {
112    std::time::SystemTime::now()
113        .duration_since(std::time::UNIX_EPOCH)
114        .unwrap_or_default()
115        .as_millis() as u64
116}
117
118/// Filesystem blob store: writes each blob to `dir/<hash>`. Used by
119/// `init_local`.
120pub struct FsBlobStore {
121    dir: std::path::PathBuf,
122}
123
124impl FsBlobStore {
125    pub fn new(dir: impl Into<std::path::PathBuf>) -> Self {
126        Self { dir: dir.into() }
127    }
128}
129
130#[async_trait::async_trait]
131impl BlobStore for FsBlobStore {
132    async fn put_blob(
133        &self,
134        hash: &BlobHash,
135        _content_type: &str,
136        content: &[u8],
137    ) -> anyhow::Result<()> {
138        // Hashes are prefixed (`sha256:`); ':' is not portable in filenames.
139        let path = self.dir.join(hash.0.replace(':', "_"));
140        tokio::fs::write(path, content).await?;
141        Ok(())
142    }
143
144    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
145        let path = self.dir.join(hash.0.replace(':', "_"));
146        match tokio::fs::read(path).await {
147            Ok(bytes) => Ok(Some(bytes)),
148            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
149            Err(e) => Err(e.into()),
150        }
151    }
152}
153
154/// In-memory blob store, handy for tests and ephemeral runs.
155#[derive(Clone, Default)]
156pub struct MemoryBlobStore {
157    blobs: Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
158}
159
160impl MemoryBlobStore {
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    pub fn len(&self) -> usize {
166        self.blobs.lock().unwrap().len()
167    }
168
169    pub fn is_empty(&self) -> bool {
170        self.len() == 0
171    }
172}
173
174#[async_trait::async_trait]
175impl BlobStore for MemoryBlobStore {
176    async fn put_blob(
177        &self,
178        hash: &BlobHash,
179        _content_type: &str,
180        content: &[u8],
181    ) -> anyhow::Result<()> {
182        self.blobs
183            .lock()
184            .unwrap()
185            .insert(hash.0.clone(), content.to_vec());
186        Ok(())
187    }
188
189    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
190        Ok(self.blobs.lock().unwrap().get(&hash.0).cloned())
191    }
192}