use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum TransformError {
#[error("Transformation failed: {0}")]
Failed(String),
#[error("Invalid transformation configuration: {0}")]
InvalidConfig(String),
#[error("Transformation not found: {0}")]
NotFound(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Debug, Clone)]
pub struct TransformContext {
pub key: String,
pub bucket: String,
pub content_type: String,
pub headers: HashMap<String, String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug)]
pub struct TransformResult {
pub data: Bytes,
pub content_type: Option<String>,
pub metadata: HashMap<String, String>,
}
impl TransformResult {
pub fn new(data: Bytes) -> Self {
Self {
data,
content_type: None,
metadata: HashMap::new(),
}
}
pub fn with_content_type(data: Bytes, content_type: String) -> Self {
Self {
data,
content_type: Some(content_type),
metadata: HashMap::new(),
}
}
}
pub type TransformFn =
Arc<dyn Fn(&Bytes, &TransformContext) -> Result<TransformResult, TransformError> + Send + Sync>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectLambdaConfig {
pub name: String,
pub supporting_access_point_arn: String,
pub transformation_ids: Vec<String>,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LambdaTransformationType {
Redact { patterns: Vec<String> },
ToUpperCase,
ToLowerCase,
AddPrefix { prefix: String },
AddSuffix { suffix: String },
Replace { from: String, to: String },
Compress { algorithm: String },
Decompress { algorithm: String },
JsonTransform { path: String, operation: String },
}
pub struct TransformRegistry {
transformations: HashMap<String, TransformFn>,
configs: HashMap<String, ObjectLambdaConfig>,
}
impl TransformRegistry {
pub fn new() -> Self {
let mut registry = Self {
transformations: HashMap::new(),
configs: HashMap::new(),
};
registry.register_builtin_transformations();
registry
}
pub fn register<F>(&mut self, id: String, transform: F)
where
F: Fn(&Bytes, &TransformContext) -> Result<TransformResult, TransformError>
+ Send
+ Sync
+ 'static,
{
self.transformations.insert(id, Arc::new(transform));
}
fn register_builtin_transformations(&mut self) {
self.register("to_uppercase".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
Ok(TransformResult::new(Bytes::from(text.to_uppercase())))
});
self.register("to_lowercase".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
Ok(TransformResult::new(Bytes::from(text.to_lowercase())))
});
self.register("add_prefix".to_string(), |data, ctx| {
let prefix = ctx.metadata.get("prefix").map(|s| s.as_str()).unwrap_or("");
let mut result = BytesMut::with_capacity(prefix.len() + data.len());
result.extend_from_slice(prefix.as_bytes());
result.extend_from_slice(data);
Ok(TransformResult::new(result.freeze()))
});
self.register("add_suffix".to_string(), |data, ctx| {
let suffix = ctx.metadata.get("suffix").map(|s| s.as_str()).unwrap_or("");
let mut result = BytesMut::with_capacity(data.len() + suffix.len());
result.extend_from_slice(data);
result.extend_from_slice(suffix.as_bytes());
Ok(TransformResult::new(result.freeze()))
});
self.register("redact_emails".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
let email_pattern =
regex::Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b")
.map_err(|e| TransformError::Failed(e.to_string()))?;
let redacted = email_pattern.replace_all(&text, "[REDACTED-EMAIL]");
Ok(TransformResult::new(Bytes::from(redacted.to_string())))
});
self.register("redact_phones".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
let phone_pattern = regex::Regex::new(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b")
.map_err(|e| TransformError::Failed(e.to_string()))?;
let redacted = phone_pattern.replace_all(&text, "[REDACTED-PHONE]");
Ok(TransformResult::new(Bytes::from(redacted.to_string())))
});
self.register("redact_creditcards".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
let cc_pattern = regex::Regex::new(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b")
.map_err(|e| TransformError::Failed(e.to_string()))?;
let redacted = cc_pattern.replace_all(&text, "[REDACTED-CC]");
Ok(TransformResult::new(Bytes::from(redacted.to_string())))
});
self.register("json_prettify".to_string(), |data, _ctx| {
let value: serde_json::Value = serde_json::from_slice(data)
.map_err(|e| TransformError::Failed(format!("Invalid JSON: {}", e)))?;
let pretty = serde_json::to_string_pretty(&value)
.map_err(|e| TransformError::Failed(e.to_string()))?;
Ok(TransformResult::with_content_type(
Bytes::from(pretty),
"application/json".to_string(),
))
});
self.register("json_minify".to_string(), |data, _ctx| {
let value: serde_json::Value = serde_json::from_slice(data)
.map_err(|e| TransformError::Failed(format!("Invalid JSON: {}", e)))?;
let minified =
serde_json::to_string(&value).map_err(|e| TransformError::Failed(e.to_string()))?;
Ok(TransformResult::with_content_type(
Bytes::from(minified),
"application/json".to_string(),
))
});
}
pub fn get(&self, id: &str) -> Option<&TransformFn> {
self.transformations.get(id)
}
pub fn apply(
&self,
id: &str,
data: &Bytes,
context: &TransformContext,
) -> Result<TransformResult, TransformError> {
let transform = self
.get(id)
.ok_or_else(|| TransformError::NotFound(id.to_string()))?;
transform(data, context)
}
pub fn apply_chain(
&self,
ids: &[String],
mut data: Bytes,
context: &mut TransformContext,
) -> Result<TransformResult, TransformError> {
let mut final_content_type = None;
let mut final_metadata = HashMap::new();
for id in ids {
let result = self.apply(id, &data, context)?;
data = result.data;
if let Some(ct) = result.content_type {
final_content_type = Some(ct.clone());
context.content_type = ct;
}
final_metadata.extend(result.metadata);
}
Ok(TransformResult {
data,
content_type: final_content_type,
metadata: final_metadata,
})
}
pub fn register_config(&mut self, config: ObjectLambdaConfig) {
self.configs.insert(config.name.clone(), config);
}
pub fn get_config(&self, name: &str) -> Option<&ObjectLambdaConfig> {
self.configs.get(name)
}
pub fn apply_config(
&self,
config_name: &str,
data: Bytes,
context: &mut TransformContext,
) -> Result<TransformResult, TransformError> {
let config = self
.get_config(config_name)
.ok_or_else(|| TransformError::NotFound(format!("Config: {}", config_name)))?;
if !config.enabled {
return Ok(TransformResult::new(data));
}
self.apply_chain(&config.transformation_ids, data, context)
}
pub fn list_transformations(&self) -> Vec<String> {
self.transformations.keys().cloned().collect()
}
pub fn list_configs(&self) -> Vec<String> {
self.configs.keys().cloned().collect()
}
}
impl Default for TransformRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_context() -> TransformContext {
TransformContext {
key: "test.txt".to_string(),
bucket: "test-bucket".to_string(),
content_type: "text/plain".to_string(),
headers: HashMap::new(),
metadata: HashMap::new(),
}
}
#[test]
fn test_uppercase_transformation() {
let registry = TransformRegistry::new();
let data = Bytes::from("hello world");
let ctx = test_context();
let result = registry
.apply("to_uppercase", &data, &ctx)
.expect("Failed to apply uppercase transformation");
assert_eq!(result.data, Bytes::from("HELLO WORLD"));
}
#[test]
fn test_lowercase_transformation() {
let registry = TransformRegistry::new();
let data = Bytes::from("HELLO WORLD");
let ctx = test_context();
let result = registry
.apply("to_lowercase", &data, &ctx)
.expect("Failed to apply lowercase transformation");
assert_eq!(result.data, Bytes::from("hello world"));
}
#[test]
fn test_redact_emails() {
let registry = TransformRegistry::new();
let data = Bytes::from("Contact us at support@example.com or admin@test.org");
let ctx = test_context();
let result = registry
.apply("redact_emails", &data, &ctx)
.expect("Failed to apply redact_emails transformation");
let output = String::from_utf8(result.data.to_vec()).expect("Failed to convert to UTF8");
assert!(output.contains("[REDACTED-EMAIL]"));
assert!(!output.contains("support@example.com"));
assert!(!output.contains("admin@test.org"));
}
#[test]
fn test_redact_phones() {
let registry = TransformRegistry::new();
let data = Bytes::from("Call 555-123-4567 or 555.987.6543");
let ctx = test_context();
let result = registry
.apply("redact_phones", &data, &ctx)
.expect("Failed to apply redact_phones transformation");
let output = String::from_utf8(result.data.to_vec()).expect("Failed to convert to UTF8");
assert!(output.contains("[REDACTED-PHONE]"));
assert!(!output.contains("555-123-4567"));
}
#[test]
fn test_redact_creditcards() {
let registry = TransformRegistry::new();
let data = Bytes::from("Card: 1234-5678-9012-3456");
let ctx = test_context();
let result = registry
.apply("redact_creditcards", &data, &ctx)
.expect("Failed to apply redact_creditcards transformation");
let output = String::from_utf8(result.data.to_vec()).expect("Failed to convert to UTF8");
assert!(output.contains("[REDACTED-CC]"));
assert!(!output.contains("1234-5678-9012-3456"));
}
#[test]
fn test_json_prettify() {
let registry = TransformRegistry::new();
let data = Bytes::from(r#"{"name":"test","value":123}"#);
let ctx = test_context();
let result = registry
.apply("json_prettify", &data, &ctx)
.expect("Failed to apply json_prettify transformation");
let output = String::from_utf8(result.data.to_vec()).expect("Failed to convert to UTF8");
assert!(output.contains('\n')); assert!(output.contains(" ")); }
#[test]
fn test_transformation_chain() {
let registry = TransformRegistry::new();
let data = Bytes::from("hello world");
let mut ctx = test_context();
let transformations = vec!["to_uppercase".to_string()];
let result = registry
.apply_chain(&transformations, data, &mut ctx)
.expect("Failed to apply transformation chain");
assert_eq!(result.data, Bytes::from("HELLO WORLD"));
}
#[test]
fn test_custom_transformation() {
let mut registry = TransformRegistry::new();
registry.register("reverse".to_string(), |data, _ctx| {
let text = String::from_utf8_lossy(data);
let reversed: String = text.chars().rev().collect();
Ok(TransformResult::new(Bytes::from(reversed)))
});
let data = Bytes::from("hello");
let ctx = test_context();
let result = registry
.apply("reverse", &data, &ctx)
.expect("Failed to apply custom transformation");
assert_eq!(result.data, Bytes::from("olleh"));
}
#[test]
fn test_object_lambda_config() {
let mut registry = TransformRegistry::new();
let config = ObjectLambdaConfig {
name: "pii-redaction".to_string(),
supporting_access_point_arn:
"arn:aws:s3:us-east-1:123456789012:accesspoint/my-access-point".to_string(),
transformation_ids: vec!["redact_emails".to_string(), "redact_phones".to_string()],
enabled: true,
};
registry.register_config(config);
let data = Bytes::from("Contact: john@example.com, Phone: 555-123-4567");
let mut ctx = test_context();
let result = registry
.apply_config("pii-redaction", data, &mut ctx)
.expect("Failed to apply config");
let output = String::from_utf8(result.data.to_vec()).expect("Failed to convert to UTF8");
assert!(output.contains("[REDACTED-EMAIL]"));
assert!(output.contains("[REDACTED-PHONE]"));
}
#[test]
fn test_add_prefix_transformation() {
let registry = TransformRegistry::new();
let data = Bytes::from("world");
let mut ctx = test_context();
ctx.metadata
.insert("prefix".to_string(), "Hello, ".to_string());
let result = registry
.apply("add_prefix", &data, &ctx)
.expect("Failed to apply add_prefix transformation");
assert_eq!(result.data, Bytes::from("Hello, world"));
}
#[test]
fn test_disabled_config() {
let mut registry = TransformRegistry::new();
let config = ObjectLambdaConfig {
name: "disabled-transform".to_string(),
supporting_access_point_arn:
"arn:aws:s3:us-east-1:123456789012:accesspoint/my-access-point".to_string(),
transformation_ids: vec!["to_uppercase".to_string()],
enabled: false,
};
registry.register_config(config);
let data = Bytes::from("hello");
let mut ctx = test_context();
let result = registry
.apply_config("disabled-transform", data.clone(), &mut ctx)
.expect("Failed to apply disabled config");
assert_eq!(result.data, data); }
}