Skip to main content

cellos_sink_redact/
lib.rs

1//! [`EventSink`] wrapper that redacts nominated JSON fields before forwarding.
2//!
3//! CloudEvents emitted by the supervisor can contain:
4//! - `argv` — command lines, which may include secrets passed as arguments
5//! - `path` — artifact and working-directory filesystem paths
6//! - `secretRefs` — names (not values) of secrets referenced by the cell
7//! - `reason` — teardown error messages, which may contain paths or argv fragments
8//!
9//! This sink wraps any `Arc<dyn EventSink>` and, before calling `inner.emit()`,
10//! walks the `CloudEventV1.data` JSON tree replacing the **values** of nominated
11//! top-level and nested field names with a fixed placeholder string.
12//!
13//! # Activation
14//!
15//! In the composition root, set `CELLOS_REDACT_EVENT_FIELDS` to a
16//! comma-separated list of JSON field names to redact, e.g.:
17//! ```text
18//! CELLOS_REDACT_EVENT_FIELDS=argv,path,reason
19//! ```
20//!
21//! Use `CELLOS_REDACT_EVENT_FIELDS=defaults` to apply the built-in
22//! conservative set: `argv,path,reason`.
23//!
24//! # Security notes
25//!
26//! - Secret **values** never appear in events; only secret *names* (`secretRefs`).
27//! - Redaction applies to the JSON value tree, not to CloudEvent metadata fields
28//!   (id, source, type, time).
29//! - The placeholder value is the fixed string `"[redacted]"` for scalar values
30//!   and `["[redacted]"]` for arrays; maps are replaced with `{}`.
31//! - An absent or empty `data` field is forwarded unchanged.
32
33use std::collections::HashSet;
34use std::sync::Arc;
35
36use async_trait::async_trait;
37use cellos_core::ports::EventSink;
38use cellos_core::{CellosError, CloudEventV1};
39use serde_json::Value;
40
41/// Placeholder used for redacted scalar and array values.
42const REDACTED: &str = "[redacted]";
43
44/// The conservative default set of field names to redact.
45pub const DEFAULT_REDACT_FIELDS: &[&str] = &["argv", "path", "reason"];
46
47/// Wraps an inner [`EventSink`], walking the `data` JSON tree and replacing
48/// the values of any nominated field names with the placeholder `"[redacted]"`.
49pub struct RedactingEventSink {
50    inner: Arc<dyn EventSink>,
51    fields: HashSet<String>,
52}
53
54impl RedactingEventSink {
55    /// Create a new redacting sink with an explicit set of field names.
56    pub fn new(
57        inner: Arc<dyn EventSink>,
58        fields: impl IntoIterator<Item = impl Into<String>>,
59    ) -> Self {
60        Self {
61            inner,
62            fields: fields.into_iter().map(Into::into).collect(),
63        }
64    }
65
66    /// Create a new redacting sink with the default conservative field set:
67    /// `argv`, `path`, `reason`.
68    pub fn with_defaults(inner: Arc<dyn EventSink>) -> Self {
69        Self::new(inner, DEFAULT_REDACT_FIELDS.iter().copied())
70    }
71
72    /// Parse `CELLOS_REDACT_EVENT_FIELDS` from the environment and return a
73    /// redacting sink if the variable is set and non-empty, otherwise return
74    /// the inner sink unwrapped.
75    ///
76    /// `"defaults"` expands to the built-in field set.
77    pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
78        let Ok(raw) = std::env::var("CELLOS_REDACT_EVENT_FIELDS") else {
79            return inner;
80        };
81        let trimmed = raw.trim();
82        if trimmed.is_empty() {
83            return inner;
84        }
85        if trimmed.eq_ignore_ascii_case("defaults") {
86            tracing::info!(fields = ?DEFAULT_REDACT_FIELDS, "event redaction enabled (defaults)");
87            return Arc::new(Self::with_defaults(inner));
88        }
89        let fields: Vec<&str> = trimmed
90            .split(',')
91            .map(str::trim)
92            .filter(|s| !s.is_empty())
93            .collect();
94        tracing::info!(?fields, "event redaction enabled");
95        Arc::new(Self::new(inner, fields))
96    }
97
98    fn redact_value(value: &mut Value, fields: &HashSet<String>) {
99        match value {
100            Value::Object(map) => {
101                for (key, val) in map.iter_mut() {
102                    if fields.contains(key.as_str()) {
103                        *val = redacted_placeholder(val);
104                    } else {
105                        Self::redact_value(val, fields);
106                    }
107                }
108            }
109            Value::Array(arr) => {
110                for item in arr.iter_mut() {
111                    Self::redact_value(item, fields);
112                }
113            }
114            _ => {}
115        }
116    }
117}
118
119fn redacted_placeholder(original: &Value) -> Value {
120    match original {
121        Value::Array(_) => Value::Array(vec![Value::String(REDACTED.into())]),
122        Value::Object(_) => Value::Object(serde_json::Map::new()),
123        _ => Value::String(REDACTED.into()),
124    }
125}
126
127#[async_trait]
128impl EventSink for RedactingEventSink {
129    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
130        if event.data.is_none() || self.fields.is_empty() {
131            return self.inner.emit(event).await;
132        }
133
134        let mut redacted = event.clone();
135        if let Some(ref mut data) = redacted.data {
136            Self::redact_value(data, &self.fields);
137        }
138
139        self.inner.emit(&redacted).await
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use cellos_core::ports::NoopEventSink;
147    use serde_json::json;
148    use std::sync::Mutex;
149
150    /// Capture sink records the last event it received.
151    struct CaptureSink(Mutex<Option<CloudEventV1>>);
152
153    impl CaptureSink {
154        fn new() -> Arc<Self> {
155            Arc::new(Self(Mutex::new(None)))
156        }
157        fn last(&self) -> Option<CloudEventV1> {
158            self.0.lock().unwrap().clone()
159        }
160    }
161
162    #[async_trait]
163    impl EventSink for CaptureSink {
164        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
165            *self.0.lock().unwrap() = Some(event.clone());
166            Ok(())
167        }
168    }
169
170    fn test_event(data: Value) -> CloudEventV1 {
171        CloudEventV1 {
172            specversion: "1.0".into(),
173            id: "test-id".into(),
174            source: "test".into(),
175            ty: "test.event".into(),
176            datacontenttype: Some("application/json".into()),
177            data: Some(data),
178            time: None,
179            traceparent: None,
180        }
181    }
182
183    #[tokio::test]
184    async fn redacts_argv_array() {
185        let capture = CaptureSink::new();
186        let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
187        let event =
188            test_event(json!({ "argv": ["/usr/bin/sh", "-c", "echo hi"], "other": "keep" }));
189        sink.emit(&event).await.unwrap();
190        let got = capture.last().unwrap();
191        let data = got.data.unwrap();
192        assert_eq!(
193            data["argv"],
194            json!(["[redacted]"]),
195            "argv should be redacted"
196        );
197        assert_eq!(data["other"], "keep", "other fields preserved");
198    }
199
200    #[tokio::test]
201    async fn redacts_scalar_path() {
202        let capture = CaptureSink::new();
203        let sink = RedactingEventSink::new(capture.clone(), ["path"]);
204        let event = test_event(json!({ "path": "/tmp/sensitive/artifact.tar.gz", "name": "keep" }));
205        sink.emit(&event).await.unwrap();
206        let data = capture.last().unwrap().data.unwrap();
207        assert_eq!(data["path"], "[redacted]");
208        assert_eq!(data["name"], "keep");
209    }
210
211    #[tokio::test]
212    async fn redacts_nested_fields() {
213        let capture = CaptureSink::new();
214        let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
215        let event = test_event(json!({
216            "run": { "argv": ["secret-program", "--token=abc123"], "env": {} },
217            "id": "cell-1"
218        }));
219        sink.emit(&event).await.unwrap();
220        let data = capture.last().unwrap().data.unwrap();
221        assert_eq!(data["run"]["argv"], json!(["[redacted]"]));
222        assert_eq!(data["id"], "cell-1");
223    }
224
225    #[tokio::test]
226    async fn passes_through_when_no_fields() {
227        let capture = CaptureSink::new();
228        let sink = RedactingEventSink::new(capture.clone(), std::iter::empty::<&str>());
229        let original = json!({ "argv": ["important"] });
230        let event = test_event(original.clone());
231        sink.emit(&event).await.unwrap();
232        let data = capture.last().unwrap().data.unwrap();
233        assert_eq!(data, original);
234    }
235
236    #[tokio::test]
237    async fn passes_through_events_without_data() {
238        let capture = CaptureSink::new();
239        let sink = RedactingEventSink::with_defaults(capture.clone());
240        let event = CloudEventV1 {
241            specversion: "1.0".into(),
242            id: "no-data".into(),
243            source: "test".into(),
244            ty: "test.event".into(),
245            datacontenttype: None,
246            data: None,
247            time: None,
248            traceparent: None,
249        };
250        sink.emit(&event).await.unwrap();
251        assert!(capture.last().unwrap().data.is_none());
252    }
253
254    /// Guards env-var mutation so from_env tests don't race with each other.
255    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
256
257    #[tokio::test]
258    async fn from_env_noop_when_var_absent() {
259        let sink = {
260            let _guard = FROM_ENV_MUTEX.lock().unwrap();
261            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
262            let inner: Arc<dyn EventSink> = Arc::new(NoopEventSink);
263            RedactingEventSink::from_env(inner)
264        };
265        // Should be NoopEventSink (not wrapped)
266        let event = test_event(json!({ "argv": ["not-redacted"] }));
267        sink.emit(&event).await.unwrap();
268    }
269
270    #[tokio::test]
271    async fn from_env_defaults_keyword_redacts_default_fields() {
272        let capture = CaptureSink::new();
273        let sink = {
274            let _guard = FROM_ENV_MUTEX.lock().unwrap();
275            std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "defaults");
276            RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
277        };
278        let event = test_event(json!({
279            "argv": ["/usr/bin/sh", "-c", "echo secret"],
280            "path": "/tmp/secret-file",
281            "reason": "exit 1",
282            "other": "keep"
283        }));
284        sink.emit(&event).await.unwrap();
285        {
286            let _guard = FROM_ENV_MUTEX.lock().unwrap();
287            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
288        }
289        let data = capture.last().unwrap().data.unwrap();
290        assert_eq!(data["argv"], json!(["[redacted]"]), "argv in defaults");
291        assert_eq!(data["path"], "[redacted]", "path in defaults");
292        assert_eq!(data["reason"], "[redacted]", "reason in defaults");
293        assert_eq!(data["other"], "keep", "non-default field preserved");
294    }
295
296    #[tokio::test]
297    async fn from_env_explicit_field_list_redacts_only_named_fields() {
298        let capture = CaptureSink::new();
299        let sink = {
300            let _guard = FROM_ENV_MUTEX.lock().unwrap();
301            std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "path");
302            RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
303        };
304        let event = test_event(json!({ "argv": ["keep-this"], "path": "/secret/path" }));
305        sink.emit(&event).await.unwrap();
306        {
307            let _guard = FROM_ENV_MUTEX.lock().unwrap();
308            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
309        }
310        let data = capture.last().unwrap().data.unwrap();
311        assert_eq!(
312            data["argv"],
313            json!(["keep-this"]),
314            "argv not in field list, must be kept"
315        );
316        assert_eq!(data["path"], "[redacted]", "path is in field list");
317    }
318}