use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use cellos_core::ports::EventSink;
use cellos_core::{CellosError, CloudEventV1};
use serde_json::Value;
const REDACTED: &str = "[redacted]";
pub const DEFAULT_REDACT_FIELDS: &[&str] = &["argv", "path", "reason"];
pub struct RedactingEventSink {
inner: Arc<dyn EventSink>,
fields: HashSet<String>,
}
impl RedactingEventSink {
pub fn new(
inner: Arc<dyn EventSink>,
fields: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
Self {
inner,
fields: fields.into_iter().map(Into::into).collect(),
}
}
pub fn with_defaults(inner: Arc<dyn EventSink>) -> Self {
Self::new(inner, DEFAULT_REDACT_FIELDS.iter().copied())
}
pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
let Ok(raw) = std::env::var("CELLOS_REDACT_EVENT_FIELDS") else {
return inner;
};
let trimmed = raw.trim();
if trimmed.is_empty() {
return inner;
}
if trimmed.eq_ignore_ascii_case("defaults") {
tracing::info!(fields = ?DEFAULT_REDACT_FIELDS, "event redaction enabled (defaults)");
return Arc::new(Self::with_defaults(inner));
}
let fields: Vec<&str> = trimmed
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
tracing::info!(?fields, "event redaction enabled");
Arc::new(Self::new(inner, fields))
}
fn redact_value(value: &mut Value, fields: &HashSet<String>) {
match value {
Value::Object(map) => {
for (key, val) in map.iter_mut() {
if fields.contains(key.as_str()) {
*val = redacted_placeholder(val);
} else {
Self::redact_value(val, fields);
}
}
}
Value::Array(arr) => {
for item in arr.iter_mut() {
Self::redact_value(item, fields);
}
}
_ => {}
}
}
}
fn redacted_placeholder(original: &Value) -> Value {
match original {
Value::Array(_) => Value::Array(vec![Value::String(REDACTED.into())]),
Value::Object(_) => Value::Object(serde_json::Map::new()),
_ => Value::String(REDACTED.into()),
}
}
#[async_trait]
impl EventSink for RedactingEventSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
if event.data.is_none() || self.fields.is_empty() {
return self.inner.emit(event).await;
}
let mut redacted = event.clone();
if let Some(ref mut data) = redacted.data {
Self::redact_value(data, &self.fields);
}
self.inner.emit(&redacted).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use cellos_core::ports::NoopEventSink;
use serde_json::json;
use std::sync::Mutex;
struct CaptureSink(Mutex<Option<CloudEventV1>>);
impl CaptureSink {
fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(None)))
}
fn last(&self) -> Option<CloudEventV1> {
self.0.lock().unwrap().clone()
}
}
#[async_trait]
impl EventSink for CaptureSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
*self.0.lock().unwrap() = Some(event.clone());
Ok(())
}
}
fn test_event(data: Value) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: "test-id".into(),
source: "test".into(),
ty: "test.event".into(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: None,
traceparent: None,
}
}
#[tokio::test]
async fn redacts_argv_array() {
let capture = CaptureSink::new();
let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
let event =
test_event(json!({ "argv": ["/usr/bin/sh", "-c", "echo hi"], "other": "keep" }));
sink.emit(&event).await.unwrap();
let got = capture.last().unwrap();
let data = got.data.unwrap();
assert_eq!(
data["argv"],
json!(["[redacted]"]),
"argv should be redacted"
);
assert_eq!(data["other"], "keep", "other fields preserved");
}
#[tokio::test]
async fn redacts_scalar_path() {
let capture = CaptureSink::new();
let sink = RedactingEventSink::new(capture.clone(), ["path"]);
let event = test_event(json!({ "path": "/tmp/sensitive/artifact.tar.gz", "name": "keep" }));
sink.emit(&event).await.unwrap();
let data = capture.last().unwrap().data.unwrap();
assert_eq!(data["path"], "[redacted]");
assert_eq!(data["name"], "keep");
}
#[tokio::test]
async fn redacts_nested_fields() {
let capture = CaptureSink::new();
let sink = RedactingEventSink::new(capture.clone(), ["argv"]);
let event = test_event(json!({
"run": { "argv": ["secret-program", "--token=abc123"], "env": {} },
"id": "cell-1"
}));
sink.emit(&event).await.unwrap();
let data = capture.last().unwrap().data.unwrap();
assert_eq!(data["run"]["argv"], json!(["[redacted]"]));
assert_eq!(data["id"], "cell-1");
}
#[tokio::test]
async fn passes_through_when_no_fields() {
let capture = CaptureSink::new();
let sink = RedactingEventSink::new(capture.clone(), std::iter::empty::<&str>());
let original = json!({ "argv": ["important"] });
let event = test_event(original.clone());
sink.emit(&event).await.unwrap();
let data = capture.last().unwrap().data.unwrap();
assert_eq!(data, original);
}
#[tokio::test]
async fn passes_through_events_without_data() {
let capture = CaptureSink::new();
let sink = RedactingEventSink::with_defaults(capture.clone());
let event = CloudEventV1 {
specversion: "1.0".into(),
id: "no-data".into(),
source: "test".into(),
ty: "test.event".into(),
datacontenttype: None,
data: None,
time: None,
traceparent: None,
};
sink.emit(&event).await.unwrap();
assert!(capture.last().unwrap().data.is_none());
}
static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[tokio::test]
async fn from_env_noop_when_var_absent() {
let sink = {
let _guard = FROM_ENV_MUTEX.lock().unwrap();
std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
let inner: Arc<dyn EventSink> = Arc::new(NoopEventSink);
RedactingEventSink::from_env(inner)
};
let event = test_event(json!({ "argv": ["not-redacted"] }));
sink.emit(&event).await.unwrap();
}
#[tokio::test]
async fn from_env_defaults_keyword_redacts_default_fields() {
let capture = CaptureSink::new();
let sink = {
let _guard = FROM_ENV_MUTEX.lock().unwrap();
std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "defaults");
RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = test_event(json!({
"argv": ["/usr/bin/sh", "-c", "echo secret"],
"path": "/tmp/secret-file",
"reason": "exit 1",
"other": "keep"
}));
sink.emit(&event).await.unwrap();
{
let _guard = FROM_ENV_MUTEX.lock().unwrap();
std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
}
let data = capture.last().unwrap().data.unwrap();
assert_eq!(data["argv"], json!(["[redacted]"]), "argv in defaults");
assert_eq!(data["path"], "[redacted]", "path in defaults");
assert_eq!(data["reason"], "[redacted]", "reason in defaults");
assert_eq!(data["other"], "keep", "non-default field preserved");
}
#[tokio::test]
async fn from_env_explicit_field_list_redacts_only_named_fields() {
let capture = CaptureSink::new();
let sink = {
let _guard = FROM_ENV_MUTEX.lock().unwrap();
std::env::set_var("CELLOS_REDACT_EVENT_FIELDS", "path");
RedactingEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = test_event(json!({ "argv": ["keep-this"], "path": "/secret/path" }));
sink.emit(&event).await.unwrap();
{
let _guard = FROM_ENV_MUTEX.lock().unwrap();
std::env::remove_var("CELLOS_REDACT_EVENT_FIELDS");
}
let data = capture.last().unwrap().data.unwrap();
assert_eq!(
data["argv"],
json!(["keep-this"]),
"argv not in field list, must be kept"
);
assert_eq!(data["path"], "[redacted]", "path is in field list");
}
}