enact_core/streaming/protection/
encryption.rs1use super::context::ProtectionContext;
9use super::processor::{OutputProcessor, ProcessedEvent};
10use crate::streaming::StreamEvent;
11use aes_gcm::{
12 aead::{Aead, KeyInit},
13 Aes256Gcm, Nonce,
14};
15use async_trait::async_trait;
16use rand::RngCore;
17
18pub type EncryptionKey = [u8; 32];
20
21pub struct EncryptionProcessor {
40 enabled: bool,
42 key: EncryptionKey,
44}
45
46impl EncryptionProcessor {
47 pub fn new() -> Self {
51 Self {
52 enabled: false,
53 key: [0u8; 32],
54 }
55 }
56
57 pub fn with_key(mut self, key: EncryptionKey) -> Self {
59 self.key = key;
60 self
61 }
62
63 pub fn enabled(mut self) -> Self {
71 if std::env::var("ENACT_PRODUCTION")
72 .map(|v| v.to_lowercase() == "true")
73 .unwrap_or(false)
74 {
75 panic!(
76 "EncryptionProcessor is DEVELOPMENT-ONLY and cannot be used in production. \
77 Set up a production-grade encryption solution with KMS integration. \
78 See documentation for details."
79 );
80 }
81 self.enabled = true;
82 self
83 }
84
85 pub fn is_enabled(&self) -> bool {
87 self.enabled
88 }
89
90 fn encrypt_text(&self, text: &str) -> anyhow::Result<String> {
92 let key = aes_gcm::Key::<Aes256Gcm>::from_slice(&self.key);
93 let cipher = Aes256Gcm::new(key);
94
95 let mut nonce_bytes = [0u8; 12];
96 rand::thread_rng().fill_bytes(&mut nonce_bytes);
97 let nonce = Nonce::from_slice(&nonce_bytes);
98
99 let ciphertext = cipher
100 .encrypt(nonce, text.as_bytes())
101 .map_err(|e| anyhow::anyhow!("encryption failed: {:?}", e))?;
102
103 let mut payload = Vec::with_capacity(nonce_bytes.len() + ciphertext.len());
105 payload.extend_from_slice(&nonce_bytes);
106 payload.extend_from_slice(&ciphertext);
107
108 Ok(format!("ENC:{}", hex::encode(payload)))
109 }
110
111 fn should_encrypt_event(&self, event: &StreamEvent, ctx: &ProtectionContext) -> bool {
113 if !ctx.destination.requires_encryption() {
115 return false;
116 }
117
118 matches!(
120 event,
121 StreamEvent::TextDelta { .. }
122 | StreamEvent::StepEnd {
123 output: Some(_),
124 ..
125 }
126 | StreamEvent::ExecutionEnd {
127 final_output: Some(_),
128 ..
129 }
130 )
131 }
132
133 fn encrypt_event(&self, event: StreamEvent) -> anyhow::Result<(StreamEvent, Option<String>)> {
135 match event {
136 StreamEvent::TextDelta { id, delta } => {
137 let encrypted = self.encrypt_text(&delta)?;
138 Ok((
139 StreamEvent::TextDelta {
140 id,
141 delta: "[ENCRYPTED]".to_string(),
142 },
143 Some(encrypted),
144 ))
145 }
146 StreamEvent::StepEnd {
147 execution_id,
148 step_id,
149 output: Some(text),
150 duration_ms,
151 timestamp,
152 } => {
153 let encrypted = self.encrypt_text(&text)?;
154 Ok((
155 StreamEvent::StepEnd {
156 execution_id,
157 step_id,
158 output: Some("[ENCRYPTED]".to_string()),
159 duration_ms,
160 timestamp,
161 },
162 Some(encrypted),
163 ))
164 }
165 StreamEvent::ExecutionEnd {
166 execution_id,
167 final_output: Some(text),
168 duration_ms,
169 timestamp,
170 } => {
171 let encrypted = self.encrypt_text(&text)?;
172 Ok((
173 StreamEvent::ExecutionEnd {
174 execution_id,
175 final_output: Some("[ENCRYPTED]".to_string()),
176 duration_ms,
177 timestamp,
178 },
179 Some(encrypted),
180 ))
181 }
182 _ => Ok((event, None)),
183 }
184 }
185}
186
187impl Default for EncryptionProcessor {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193#[async_trait]
194impl OutputProcessor for EncryptionProcessor {
195 fn name(&self) -> &str {
196 "encryption"
197 }
198
199 async fn process(
200 &self,
201 event: StreamEvent,
202 ctx: &ProtectionContext,
203 ) -> anyhow::Result<ProcessedEvent> {
204 if !self.enabled {
205 return Ok(ProcessedEvent::unchanged(event));
206 }
207
208 tracing::warn!(
209 target: "enact_core::encryption",
210 "DEVELOPMENT-ONLY encryption in use. Do not use in production. Implement KMS-backed encryption instead."
211 );
212
213 if !self.should_encrypt_event(&event, ctx) {
214 return Ok(ProcessedEvent::unchanged(event));
215 }
216
217 let (encrypted_event, encrypted_payload) = self.encrypt_event(event)?;
218
219 Ok(ProcessedEvent {
220 event: encrypted_event,
221 was_modified: true,
222 encrypted_payload,
223 })
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::kernel::ExecutionId;
231 use std::sync::Mutex;
232
233 static ENACT_PRODUCTION_TEST_LOCK: std::sync::OnceLock<Mutex<()>> = std::sync::OnceLock::new();
235 fn env_lock() -> std::sync::MutexGuard<'static, ()> {
236 ENACT_PRODUCTION_TEST_LOCK
237 .get_or_init(|| Mutex::new(()))
238 .lock()
239 .unwrap()
240 }
241
242 #[tokio::test]
243 async fn test_encryption_processor_name() {
244 let processor = EncryptionProcessor::new();
245 assert_eq!(processor.name(), "encryption");
246 }
247
248 #[tokio::test]
249 async fn test_encryption_processor_disabled_by_default() {
250 let processor = EncryptionProcessor::new();
251 assert!(!processor.is_enabled());
252 }
253
254 #[tokio::test]
255 async fn test_encryption_processor_can_enable() {
256 let _guard = env_lock();
257 std::env::set_var("ENACT_PRODUCTION", "false");
258 let processor = EncryptionProcessor::new().enabled();
259 std::env::remove_var("ENACT_PRODUCTION");
260 assert!(processor.is_enabled());
261 }
262
263 #[tokio::test]
264 async fn test_encryption_skips_when_disabled() {
265 let processor = EncryptionProcessor::new();
266 let ctx = ProtectionContext::for_storage();
267 let event = StreamEvent::text_delta("id", "secret data");
268
269 let result = processor.process(event, &ctx).await.unwrap();
270
271 assert!(!result.was_modified);
273 }
274
275 #[tokio::test]
276 async fn test_encryption_skips_streaming_destination() {
277 let processor = {
278 let _guard = env_lock();
279 std::env::set_var("ENACT_PRODUCTION", "false");
280 let p = EncryptionProcessor::new().enabled();
281 std::env::remove_var("ENACT_PRODUCTION");
282 p
283 };
284 let ctx = ProtectionContext::for_stream();
285 let event = StreamEvent::text_delta("id", "secret data");
286
287 let result = processor.process(event, &ctx).await.unwrap();
288
289 assert!(!result.was_modified);
290 }
291
292 #[tokio::test]
293 async fn test_encryption_encrypts_for_storage() {
294 let processor = {
295 let _guard = env_lock();
296 std::env::set_var("ENACT_PRODUCTION", "false");
297 let p = EncryptionProcessor::new().with_key([1u8; 32]).enabled();
298 std::env::remove_var("ENACT_PRODUCTION");
299 p
300 };
301 let ctx = ProtectionContext::for_storage();
302 let event = StreamEvent::text_delta("id", "secret data");
303
304 let result = processor.process(event, &ctx).await.unwrap();
305
306 assert!(result.was_modified);
307
308 if let StreamEvent::TextDelta { delta, .. } = result.event {
309 assert_eq!(delta, "[ENCRYPTED]");
310 } else {
311 panic!("Expected TextDelta");
312 }
313
314 let payload = result
315 .encrypted_payload
316 .expect("expected encrypted payload");
317 assert!(payload.starts_with("ENC:"));
318 assert!(
319 !payload.contains("secret data"),
320 "ciphertext should not contain plaintext"
321 );
322 }
323
324 #[tokio::test]
325 async fn test_encryption_control_events_pass_through() {
326 let processor = {
327 let _guard = env_lock();
328 std::env::set_var("ENACT_PRODUCTION", "false");
329 let p = EncryptionProcessor::new().enabled();
330 std::env::remove_var("ENACT_PRODUCTION");
331 p
332 };
333 let ctx = ProtectionContext::for_storage();
334 let exec_id = ExecutionId::new();
335 let event = StreamEvent::execution_start(&exec_id);
336
337 let result = processor.process(event, &ctx).await.unwrap();
338
339 assert!(!result.was_modified);
340 }
341
342 #[test]
343 fn test_encryption_panics_in_production_mode() {
344 let _guard = env_lock();
345 std::env::set_var("ENACT_PRODUCTION", "true");
346 let result = std::panic::catch_unwind(|| {
347 let _ = EncryptionProcessor::new().enabled();
348 });
349 std::env::remove_var("ENACT_PRODUCTION");
350
351 assert!(result.is_err(), "Should panic when ENACT_PRODUCTION=true");
352 }
353
354 #[tokio::test]
355 async fn test_encryption_works_when_production_false() {
356 let _guard = env_lock();
357 std::env::set_var("ENACT_PRODUCTION", "false");
358 let processor = EncryptionProcessor::new().enabled();
359 std::env::remove_var("ENACT_PRODUCTION");
360
361 assert!(processor.is_enabled());
362 }
363}