cellos-sink-redact 0.5.1

Redacting wrapper EventSink for CellOS — strips configured fields from CloudEvents before forwarding to an inner sink.
Documentation
//! [`EventSink`] wrapper that redacts nominated JSON fields before forwarding.
//!
//! CloudEvents emitted by the supervisor can contain:
//! - `argv` — command lines, which may include secrets passed as arguments
//! - `path` — artifact and working-directory filesystem paths
//! - `secretRefs` — names (not values) of secrets referenced by the cell
//! - `reason` — teardown error messages, which may contain paths or argv fragments
//!
//! This sink wraps any `Arc<dyn EventSink>` and, before calling `inner.emit()`,
//! walks the `CloudEventV1.data` JSON tree replacing the **values** of nominated
//! top-level and nested field names with a fixed placeholder string.
//!
//! # Activation
//!
//! In the composition root, set `CELLOS_REDACT_EVENT_FIELDS` to a
//! comma-separated list of JSON field names to redact, e.g.:
//! ```text
//! CELLOS_REDACT_EVENT_FIELDS=argv,path,reason
//! ```
//!
//! Use `CELLOS_REDACT_EVENT_FIELDS=defaults` to apply the built-in
//! conservative set: `argv,path,reason`.
//!
//! # Security notes
//!
//! - Secret **values** never appear in events; only secret *names* (`secretRefs`).
//! - Redaction applies to the JSON value tree, not to CloudEvent metadata fields
//!   (id, source, type, time).
//! - The placeholder value is the fixed string `"[redacted]"` for scalar values
//!   and `["[redacted]"]` for arrays; maps are replaced with `{}`.
//! - An absent or empty `data` field is forwarded unchanged.

use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use cellos_core::ports::EventSink;
use cellos_core::{CellosError, CloudEventV1};
use serde_json::Value;

/// Placeholder used for redacted scalar and array values.
const REDACTED: &str = "[redacted]";

/// The conservative default set of field names to redact.
pub const DEFAULT_REDACT_FIELDS: &[&str] = &["argv", "path", "reason"];

/// Wraps an inner [`EventSink`], walking the `data` JSON tree and replacing
/// the values of any nominated field names with the placeholder `"[redacted]"`.
pub struct RedactingEventSink {
    inner: Arc<dyn EventSink>,
    fields: HashSet<String>,
}

impl RedactingEventSink {
    /// Create a new redacting sink with an explicit set of field names.
    pub fn new(
        inner: Arc<dyn EventSink>,
        fields: impl IntoIterator<Item = impl Into<String>>,
    ) -> Self {
        Self {
            inner,
            fields: fields.into_iter().map(Into::into).collect(),
        }
    }

    /// Create a new redacting sink with the default conservative field set:
    /// `argv`, `path`, `reason`.
    pub fn with_defaults(inner: Arc<dyn EventSink>) -> Self {
        Self::new(inner, DEFAULT_REDACT_FIELDS.iter().copied())
    }

    /// Parse `CELLOS_REDACT_EVENT_FIELDS` from the environment and return a
    /// redacting sink if the variable is set and non-empty, otherwise return
    /// the inner sink unwrapped.
    ///
    /// `"defaults"` expands to the built-in field set.
    pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
        let Ok(raw) = std::env::var("CELLOS_REDACT_EVENT_FIELDS") else {
            return inner;
        };
        let trimmed = raw.trim();
        if trimmed.is_empty() {
            return inner;
        }
        if trimmed.eq_ignore_ascii_case("defaults") {
            tracing::info!(fields = ?DEFAULT_REDACT_FIELDS, "event redaction enabled (defaults)");
            return Arc::new(Self::with_defaults(inner));
        }
        let fields: Vec<&str> = trimmed
            .split(',')
            .map(str::trim)
            .filter(|s| !s.is_empty())
            .collect();
        tracing::info!(?fields, "event redaction enabled");
        Arc::new(Self::new(inner, fields))
    }

    fn redact_value(value: &mut Value, fields: &HashSet<String>) {
        match value {
            Value::Object(map) => {
                for (key, val) in map.iter_mut() {
                    if fields.contains(key.as_str()) {
                        *val = redacted_placeholder(val);
                    } else {
                        Self::redact_value(val, fields);
                    }
                }
            }
            Value::Array(arr) => {
                for item in arr.iter_mut() {
                    Self::redact_value(item, fields);
                }
            }
            _ => {}
        }
    }
}

fn redacted_placeholder(original: &Value) -> Value {
    match original {
        Value::Array(_) => Value::Array(vec![Value::String(REDACTED.into())]),
        Value::Object(_) => Value::Object(serde_json::Map::new()),
        _ => Value::String(REDACTED.into()),
    }
}

#[async_trait]
impl EventSink for RedactingEventSink {
    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
        if event.data.is_none() || self.fields.is_empty() {
            return self.inner.emit(event).await;
        }

        let mut redacted = event.clone();
        if let Some(ref mut data) = redacted.data {
            Self::redact_value(data, &self.fields);
        }

        self.inner.emit(&redacted).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use cellos_core::ports::NoopEventSink;
    use serde_json::json;
    use std::sync::Mutex;

    /// Capture sink records the last event it received.
    struct CaptureSink(Mutex<Option<CloudEventV1>>);

    impl CaptureSink {
        fn new() -> Arc<Self> {
            Arc::new(Self(Mutex::new(None)))
        }
        fn last(&self) -> Option<CloudEventV1> {
            self.0.lock().unwrap().clone()
        }
    }

    #[async_trait]
    impl EventSink for CaptureSink {
        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
            *self.0.lock().unwrap() = Some(event.clone());
            Ok(())
        }
    }

    fn test_event(data: Value) -> CloudEventV1 {
        CloudEventV1 {
            specversion: "1.0".into(),
            id: "test-id".into(),
            source: "test".into(),
            ty: "test.event".into(),
            datacontenttype: Some("application/json".into()),
            data: Some(data),
            time: None,
            traceparent: None,
        }
    }

    #[tokio::test]
    async fn redacts_argv_array() {
        let capture = CaptureSink::new();
        let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
        let event =
            test_event(json!({ "argv": ["/usr/bin/sh", "-c", "echo hi"], "other": "keep" }));
        sink.emit(&event).await.unwrap();
        let got = capture.last().unwrap();
        let data = got.data.unwrap();
        assert_eq!(
            data["argv"],
            json!(["[redacted]"]),
            "argv should be redacted"
        );
        assert_eq!(data["other"], "keep", "other fields preserved");
    }

    #[tokio::test]
    async fn redacts_scalar_path() {
        let capture = CaptureSink::new();
        let sink = RedactingEventSink::new(capture.clone(), ["path"]);
        let event = test_event(json!({ "path": "/tmp/sensitive/artifact.tar.gz", "name": "keep" }));
        sink.emit(&event).await.unwrap();
        let data = capture.last().unwrap().data.unwrap();
        assert_eq!(data["path"], "[redacted]");
        assert_eq!(data["name"], "keep");
    }

    #[tokio::test]
    async fn redacts_nested_fields() {
        let capture = CaptureSink::new();
        let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
        let event = test_event(json!({
            "run": { "argv": ["secret-program", "--token=abc123"], "env": {} },
            "id": "cell-1"
        }));
        sink.emit(&event).await.unwrap();
        let data = capture.last().unwrap().data.unwrap();
        assert_eq!(data["run"]["argv"], json!(["[redacted]"]));
        assert_eq!(data["id"], "cell-1");
    }

    #[tokio::test]
    async fn passes_through_when_no_fields() {
        let capture = CaptureSink::new();
        let sink = RedactingEventSink::new(capture.clone(), std::iter::empty::<&str>());
        let original = json!({ "argv": ["important"] });
        let event = test_event(original.clone());
        sink.emit(&event).await.unwrap();
        let data = capture.last().unwrap().data.unwrap();
        assert_eq!(data, original);
    }

    #[tokio::test]
    async fn passes_through_events_without_data() {
        let capture = CaptureSink::new();
        let sink = RedactingEventSink::with_defaults(capture.clone());
        let event = CloudEventV1 {
            specversion: "1.0".into(),
            id: "no-data".into(),
            source: "test".into(),
            ty: "test.event".into(),
            datacontenttype: None,
            data: None,
            time: None,
            traceparent: None,
        };
        sink.emit(&event).await.unwrap();
        assert!(capture.last().unwrap().data.is_none());
    }

    /// Guards env-var mutation so from_env tests don't race with each other.
    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());

    #[tokio::test]
    async fn from_env_noop_when_var_absent() {
        let sink = {
            let _guard = FROM_ENV_MUTEX.lock().unwrap();
            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
            let inner: Arc<dyn EventSink> = Arc::new(NoopEventSink);
            RedactingEventSink::from_env(inner)
        };
        // Should be NoopEventSink (not wrapped)
        let event = test_event(json!({ "argv": ["not-redacted"] }));
        sink.emit(&event).await.unwrap();
    }

    #[tokio::test]
    async fn from_env_defaults_keyword_redacts_default_fields() {
        let capture = CaptureSink::new();
        let sink = {
            let _guard = FROM_ENV_MUTEX.lock().unwrap();
            std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "defaults");
            RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = test_event(json!({
            "argv": ["/usr/bin/sh", "-c", "echo secret"],
            "path": "/tmp/secret-file",
            "reason": "exit 1",
            "other": "keep"
        }));
        sink.emit(&event).await.unwrap();
        {
            let _guard = FROM_ENV_MUTEX.lock().unwrap();
            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
        }
        let data = capture.last().unwrap().data.unwrap();
        assert_eq!(data["argv"], json!(["[redacted]"]), "argv in defaults");
        assert_eq!(data["path"], "[redacted]", "path in defaults");
        assert_eq!(data["reason"], "[redacted]", "reason in defaults");
        assert_eq!(data["other"], "keep", "non-default field preserved");
    }

    #[tokio::test]
    async fn from_env_explicit_field_list_redacts_only_named_fields() {
        let capture = CaptureSink::new();
        let sink = {
            let _guard = FROM_ENV_MUTEX.lock().unwrap();
            std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "path");
            RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = test_event(json!({ "argv": ["keep-this"], "path": "/secret/path" }));
        sink.emit(&event).await.unwrap();
        {
            let _guard = FROM_ENV_MUTEX.lock().unwrap();
            std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
        }
        let data = capture.last().unwrap().data.unwrap();
        assert_eq!(
            data["argv"],
            json!(["keep-this"]),
            "argv not in field list, must be kept"
        );
        assert_eq!(data["path"], "[redacted]", "path is in field list");
    }
}