1use std::sync::Arc;
13use tokio::sync::OnceCell;
14use trace_weft_core::{
15 BlobHash, BlobRef, BlobStore, CapturePolicy, RedactionStatus, redactor::ArcRedactor,
16};
17
18const PREVIEW_MAX_BYTES: usize = 512;
19
20pub struct CaptureConfig {
22 pub policy: CapturePolicy,
23 pub blobs: Arc<dyn BlobStore>,
24 pub redactor: ArcRedactor,
25 pub storage_backend: String,
27}
28
29static CAPTURE: OnceCell<CaptureConfig> = OnceCell::const_new();
30
31pub fn init_capture(config: CaptureConfig) -> anyhow::Result<()> {
33 CAPTURE
34 .set(config)
35 .map_err(|_| anyhow::anyhow!("Capture already initialized"))?;
36 Ok(())
37}
38
39pub fn capture_enabled() -> bool {
43 CAPTURE
44 .get()
45 .is_some_and(|c| !matches!(c.policy, CapturePolicy::MetadataOnly))
46}
47
48pub async fn capture_json(content_type: &str, value: serde_json::Value) -> Option<BlobRef> {
52 let cfg = CAPTURE.get()?;
53 if matches!(cfg.policy, CapturePolicy::MetadataOnly) {
54 return None;
55 }
56
57 let raw = serde_json::to_vec(&value).ok()?;
58 let raw_text = String::from_utf8_lossy(&raw);
59
60 let (stored_bytes, redaction_status, preview_text) = match cfg.policy {
61 CapturePolicy::RedactedPreview => {
62 let result = cfg.redactor.redact(&raw_text);
63 let preview = preview(&result.redacted_text);
64 (result.redacted_text.into_bytes(), result.status, preview)
65 }
66 CapturePolicy::FullContentLocalOnly | CapturePolicy::FullContentExportable => {
67 let preview = preview(&raw_text);
68 (raw.clone(), RedactionStatus::Unredacted, preview)
69 }
70 CapturePolicy::MetadataOnly => return None,
71 };
72
73 let hash = BlobHash(sha256_hex(&stored_bytes));
74 let size_bytes = stored_bytes.len() as u64;
75
76 if let Err(err) = cfg.blobs.put_blob(&hash, content_type, &stored_bytes).await {
78 tracing::warn!(error = %err, "failed to persist captured blob");
79 }
80
81 Some(BlobRef {
82 hash,
83 content_type: content_type.to_string(),
84 size_bytes,
85 created_at_timestamp: now_ms(),
86 redaction_status,
87 encryption_status: "none".to_string(),
88 storage_backend: cfg.storage_backend.clone(),
89 preview_text_redacted: Some(preview_text),
90 })
91}
92
93fn sha256_hex(bytes: &[u8]) -> String {
94 use sha2::{Digest, Sha256};
95 let mut hasher = Sha256::new();
96 hasher.update(bytes);
97 format!("sha256:{:x}", hasher.finalize())
98}
99
100fn preview(text: &str) -> String {
101 if text.len() <= PREVIEW_MAX_BYTES {
102 return text.to_string();
103 }
104 let mut end = PREVIEW_MAX_BYTES;
105 while !text.is_char_boundary(end) {
106 end -= 1;
107 }
108 format!("{}…", &text[..end])
109}
110
111fn now_ms() -> u64 {
112 std::time::SystemTime::now()
113 .duration_since(std::time::UNIX_EPOCH)
114 .unwrap_or_default()
115 .as_millis() as u64
116}
117
118pub struct FsBlobStore {
121 dir: std::path::PathBuf,
122}
123
124impl FsBlobStore {
125 pub fn new(dir: impl Into<std::path::PathBuf>) -> Self {
126 Self { dir: dir.into() }
127 }
128}
129
130#[async_trait::async_trait]
131impl BlobStore for FsBlobStore {
132 async fn put_blob(
133 &self,
134 hash: &BlobHash,
135 _content_type: &str,
136 content: &[u8],
137 ) -> anyhow::Result<()> {
138 let path = self.dir.join(hash.0.replace(':', "_"));
140 tokio::fs::write(path, content).await?;
141 Ok(())
142 }
143
144 async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
145 let path = self.dir.join(hash.0.replace(':', "_"));
146 match tokio::fs::read(path).await {
147 Ok(bytes) => Ok(Some(bytes)),
148 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
149 Err(e) => Err(e.into()),
150 }
151 }
152}
153
154#[derive(Clone, Default)]
156pub struct MemoryBlobStore {
157 blobs: Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
158}
159
160impl MemoryBlobStore {
161 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn len(&self) -> usize {
166 self.blobs.lock().unwrap().len()
167 }
168
169 pub fn is_empty(&self) -> bool {
170 self.len() == 0
171 }
172}
173
174#[async_trait::async_trait]
175impl BlobStore for MemoryBlobStore {
176 async fn put_blob(
177 &self,
178 hash: &BlobHash,
179 _content_type: &str,
180 content: &[u8],
181 ) -> anyhow::Result<()> {
182 self.blobs
183 .lock()
184 .unwrap()
185 .insert(hash.0.clone(), content.to_vec());
186 Ok(())
187 }
188
189 async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
190 Ok(self.blobs.lock().unwrap().get(&hash.0).cloned())
191 }
192}