cellos_sink_redact/
lib.rs1use std::collections::HashSet;
34use std::sync::Arc;
35
36use async_trait::async_trait;
37use cellos_core::ports::EventSink;
38use cellos_core::{CellosError, CloudEventV1};
39use serde_json::Value;
40
41const REDACTED: &str = "[redacted]";
43
44pub const DEFAULT_REDACT_FIELDS: &[&str] = &["argv", "path", "reason"];
46
47pub struct RedactingEventSink {
50 inner: Arc<dyn EventSink>,
51 fields: HashSet<String>,
52}
53
54impl RedactingEventSink {
55 pub fn new(
57 inner: Arc<dyn EventSink>,
58 fields: impl IntoIterator<Item = impl Into<String>>,
59 ) -> Self {
60 Self {
61 inner,
62 fields: fields.into_iter().map(Into::into).collect(),
63 }
64 }
65
66 pub fn with_defaults(inner: Arc<dyn EventSink>) -> Self {
69 Self::new(inner, DEFAULT_REDACT_FIELDS.iter().copied())
70 }
71
72 pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
78 let Ok(raw) = std::env::var("CELLOS_REDACT_EVENT_FIELDS") else {
79 return inner;
80 };
81 let trimmed = raw.trim();
82 if trimmed.is_empty() {
83 return inner;
84 }
85 if trimmed.eq_ignore_ascii_case("defaults") {
86 tracing::info!(fields = ?DEFAULT_REDACT_FIELDS, "event redaction enabled (defaults)");
87 return Arc::new(Self::with_defaults(inner));
88 }
89 let fields: Vec<&str> = trimmed
90 .split(',')
91 .map(str::trim)
92 .filter(|s| !s.is_empty())
93 .collect();
94 tracing::info!(?fields, "event redaction enabled");
95 Arc::new(Self::new(inner, fields))
96 }
97
98 fn redact_value(value: &mut Value, fields: &HashSet<String>) {
99 match value {
100 Value::Object(map) => {
101 for (key, val) in map.iter_mut() {
102 if fields.contains(key.as_str()) {
103 *val = redacted_placeholder(val);
104 } else {
105 Self::redact_value(val, fields);
106 }
107 }
108 }
109 Value::Array(arr) => {
110 for item in arr.iter_mut() {
111 Self::redact_value(item, fields);
112 }
113 }
114 _ => {}
115 }
116 }
117}
118
119fn redacted_placeholder(original: &Value) -> Value {
120 match original {
121 Value::Array(_) => Value::Array(vec![Value::String(REDACTED.into())]),
122 Value::Object(_) => Value::Object(serde_json::Map::new()),
123 _ => Value::String(REDACTED.into()),
124 }
125}
126
127#[async_trait]
128impl EventSink for RedactingEventSink {
129 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
130 if event.data.is_none() || self.fields.is_empty() {
131 return self.inner.emit(event).await;
132 }
133
134 let mut redacted = event.clone();
135 if let Some(ref mut data) = redacted.data {
136 Self::redact_value(data, &self.fields);
137 }
138
139 self.inner.emit(&redacted).await
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use cellos_core::ports::NoopEventSink;
147 use serde_json::json;
148 use std::sync::Mutex;
149
150 struct CaptureSink(Mutex<Option<CloudEventV1>>);
152
153 impl CaptureSink {
154 fn new() -> Arc<Self> {
155 Arc::new(Self(Mutex::new(None)))
156 }
157 fn last(&self) -> Option<CloudEventV1> {
158 self.0.lock().unwrap().clone()
159 }
160 }
161
162 #[async_trait]
163 impl EventSink for CaptureSink {
164 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
165 *self.0.lock().unwrap() = Some(event.clone());
166 Ok(())
167 }
168 }
169
170 fn test_event(data: Value) -> CloudEventV1 {
171 CloudEventV1 {
172 specversion: "1.0".into(),
173 id: "test-id".into(),
174 source: "test".into(),
175 ty: "test.event".into(),
176 datacontenttype: Some("application/json".into()),
177 data: Some(data),
178 time: None,
179 traceparent: None,
180 }
181 }
182
183 #[tokio::test]
184 async fn redacts_argv_array() {
185 let capture = CaptureSink::new();
186 let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
187 let event =
188 test_event(json!({ "argv": ["/usr/bin/sh", "-c", "echo hi"], "other": "keep" }));
189 sink.emit(&event).await.unwrap();
190 let got = capture.last().unwrap();
191 let data = got.data.unwrap();
192 assert_eq!(
193 data["argv"],
194 json!(["[redacted]"]),
195 "argv should be redacted"
196 );
197 assert_eq!(data["other"], "keep", "other fields preserved");
198 }
199
200 #[tokio::test]
201 async fn redacts_scalar_path() {
202 let capture = CaptureSink::new();
203 let sink = RedactingEventSink::new(capture.clone(), ["path"]);
204 let event = test_event(json!({ "path": "/tmp/sensitive/artifact.tar.gz", "name": "keep" }));
205 sink.emit(&event).await.unwrap();
206 let data = capture.last().unwrap().data.unwrap();
207 assert_eq!(data["path"], "[redacted]");
208 assert_eq!(data["name"], "keep");
209 }
210
211 #[tokio::test]
212 async fn redacts_nested_fields() {
213 let capture = CaptureSink::new();
214 let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
215 let event = test_event(json!({
216 "run": { "argv": ["secret-program", "--token=abc123"], "env": {} },
217 "id": "cell-1"
218 }));
219 sink.emit(&event).await.unwrap();
220 let data = capture.last().unwrap().data.unwrap();
221 assert_eq!(data["run"]["argv"], json!(["[redacted]"]));
222 assert_eq!(data["id"], "cell-1");
223 }
224
225 #[tokio::test]
226 async fn passes_through_when_no_fields() {
227 let capture = CaptureSink::new();
228 let sink = RedactingEventSink::new(capture.clone(), std::iter::empty::<&str>());
229 let original = json!({ "argv": ["important"] });
230 let event = test_event(original.clone());
231 sink.emit(&event).await.unwrap();
232 let data = capture.last().unwrap().data.unwrap();
233 assert_eq!(data, original);
234 }
235
236 #[tokio::test]
237 async fn passes_through_events_without_data() {
238 let capture = CaptureSink::new();
239 let sink = RedactingEventSink::with_defaults(capture.clone());
240 let event = CloudEventV1 {
241 specversion: "1.0".into(),
242 id: "no-data".into(),
243 source: "test".into(),
244 ty: "test.event".into(),
245 datacontenttype: None,
246 data: None,
247 time: None,
248 traceparent: None,
249 };
250 sink.emit(&event).await.unwrap();
251 assert!(capture.last().unwrap().data.is_none());
252 }
253
254 static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
256
257 #[tokio::test]
258 async fn from_env_noop_when_var_absent() {
259 let sink = {
260 let _guard = FROM_ENV_MUTEX.lock().unwrap();
261 std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
262 let inner: Arc<dyn EventSink> = Arc::new(NoopEventSink);
263 RedactingEventSink::from_env(inner)
264 };
265 let event = test_event(json!({ "argv": ["not-redacted"] }));
267 sink.emit(&event).await.unwrap();
268 }
269
270 #[tokio::test]
271 async fn from_env_defaults_keyword_redacts_default_fields() {
272 let capture = CaptureSink::new();
273 let sink = {
274 let _guard = FROM_ENV_MUTEX.lock().unwrap();
275 std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "defaults");
276 RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
277 };
278 let event = test_event(json!({
279 "argv": ["/usr/bin/sh", "-c", "echo secret"],
280 "path": "/tmp/secret-file",
281 "reason": "exit 1",
282 "other": "keep"
283 }));
284 sink.emit(&event).await.unwrap();
285 {
286 let _guard = FROM_ENV_MUTEX.lock().unwrap();
287 std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
288 }
289 let data = capture.last().unwrap().data.unwrap();
290 assert_eq!(data["argv"], json!(["[redacted]"]), "argv in defaults");
291 assert_eq!(data["path"], "[redacted]", "path in defaults");
292 assert_eq!(data["reason"], "[redacted]", "reason in defaults");
293 assert_eq!(data["other"], "keep", "non-default field preserved");
294 }
295
296 #[tokio::test]
297 async fn from_env_explicit_field_list_redacts_only_named_fields() {
298 let capture = CaptureSink::new();
299 let sink = {
300 let _guard = FROM_ENV_MUTEX.lock().unwrap();
301 std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "path");
302 RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
303 };
304 let event = test_event(json!({ "argv": ["keep-this"], "path": "/secret/path" }));
305 sink.emit(&event).await.unwrap();
306 {
307 let _guard = FROM_ENV_MUTEX.lock().unwrap();
308 std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
309 }
310 let data = capture.last().unwrap().data.unwrap();
311 assert_eq!(
312 data["argv"],
313 json!(["keep-this"]),
314 "argv not in field list, must be kept"
315 );
316 assert_eq!(data["path"], "[redacted]", "path is in field list");
317 }
318}