Skip to main content

codetether_agent/event_stream/
s3_sink.rs

1//! S3/R2 Object Storage Sink for Event Streams
2//!
3//! Automatically archives JSONL event stream files to S3-compatible storage
4//! (AWS S3, Cloudflare R2, MinIO, etc.) for immutable, tamper-proof archival.
5//!
6//! This is essential for compliance: SOC 2, FedRAMP, ATO processes require
7//! permanent, unmodifiable records of AI agent actions.
8//!
9//! ## Usage
10//!
11//! ```rust,ignore
12//! use codetether_agent::event_stream::s3_sink::S3Sink;
13//!
14//! let sink = S3Sink::new(
15//!     "audit-logs-bucket".to_string(),
16//!     "events/".to_string(),
17//!     "https://account-id.r2.cloudflarestorage.com".to_string(),
18//!     "access-key".to_string(),
19//!     "secret-key".to_string(),
20//! ).await?;
21//!
22//! // Upload event file to S3/R2
23//! sink.upload_file("/local/path/events.jsonl", "session-id").await?;
24//! ```
25
26use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32/// Errors that can occur during S3 operations
33#[derive(Error, Debug)]
34pub enum S3SinkError {
35    #[error("HTTP error: {0}")]
36    Http(#[from] reqwest::Error),
37
38    #[error("IO error: {0}")]
39    Io(#[from] std::io::Error),
40
41    #[error("Missing S3 configuration: {0}")]
42    MissingConfig(String),
43
44    #[error("Upload failed: {0}")]
45    UploadFailed(String),
46
47    #[error("S3 returned error: {0}")]
48    S3Error(String),
49}
50
51/// Configuration for S3/R2 sink
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54    /// S3 bucket name
55    pub bucket: String,
56    /// Path prefix for uploaded files
57    pub prefix: String,
58    /// S3 endpoint URL (None for AWS S3, Some for R2/MinIO)
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub endpoint: Option<String>,
61    /// S3 access key
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub access_key: Option<String>,
64    /// S3 secret key
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub secret_key: Option<String>,
67    /// Region (default: us-east-1)
68    #[serde(default = "default_region")]
69    pub region: String,
70    /// Stub mode - skip actual HTTP requests when S3 unavailable
71    #[serde(default)]
72    pub stub_mode: bool,
73}
74
75fn default_region() -> String {
76    "us-east-1".to_string()
77}
78
79impl S3SinkConfig {
80    /// Create config from environment variables
81    ///
82    /// Supports both `S3_*` (preferred) and `CODETETHER_S3_*` (legacy) environment variables.
83    /// The `S3_*` variables take precedence over `CODETETHER_S3_*` variables for backwards compatibility.
84    ///
85    /// # Environment Variables
86    /// - `S3_BUCKET` (or `CODETETHER_S3_BUCKET`): S3 bucket name
87    /// - `S3_PREFIX` (or `CODETETHER_S3_PREFIX`): Path prefix for uploads (default: "events/")
88    /// - `S3_ENDPOINT` (or `CODETETHER_S3_ENDPOINT`): Custom endpoint for R2/MinIO
89    /// - `S3_ACCESS_KEY` (or `CODETETHER_S3_ACCESS_KEY`): Access key
90    /// - `S3_SECRET_KEY` (or `CODETETHER_S3_SECRET_KEY`): Secret key
91    /// - `S3_REGION` (or `CODETETHER_S3_REGION`, `AWS_REGION`): Region (default: "us-east-1")
92    /// - `S3_STUB_MODE` (or `CODETETHER_S3_STUB_MODE`): Enable stub mode (skip actual uploads)
93    pub fn from_env() -> Option<Self> {
94        // Try S3_* first (new naming), fall back to CODETETHER_S3_* (legacy)
95        let bucket = std::env::var("S3_BUCKET")
96            .or_else(|_| std::env::var("CODETETHER_S3_BUCKET"))
97            .ok()?;
98
99        let prefix = std::env::var("S3_PREFIX")
100            .or_else(|_| std::env::var("CODETETHER_S3_PREFIX"))
101            .unwrap_or_else(|_| "events/".to_string());
102
103        let endpoint = std::env::var("S3_ENDPOINT")
104            .or_else(|_| std::env::var("CODETETHER_S3_ENDPOINT"))
105            .ok();
106
107        let access_key = std::env::var("S3_ACCESS_KEY")
108            .or_else(|_| std::env::var("CODETETHER_S3_ACCESS_KEY"))
109            .ok();
110
111        let secret_key = std::env::var("S3_SECRET_KEY")
112            .or_else(|_| std::env::var("CODETETHER_S3_SECRET_KEY"))
113            .ok();
114
115        let region = std::env::var("S3_REGION")
116            .or_else(|_| std::env::var("CODETETHER_S3_REGION"))
117            .or_else(|_| std::env::var("AWS_REGION"))
118            .unwrap_or_else(|_| "us-east-1".to_string());
119
120        let stub_mode = std::env::var("S3_STUB_MODE")
121            .or_else(|_| std::env::var("CODETETHER_S3_STUB_MODE"))
122            .map(|v| v.to_lowercase() == "true" || v == "1")
123            .unwrap_or(false);
124
125        // Log deprecation warning when using legacy CODETETHER_S3_* variables
126        if std::env::var("CODETETHER_S3_BUCKET").is_ok() && std::env::var("S3_BUCKET").is_err() {
127            tracing::warn!(
128                "Using legacy CODETETHER_S3_* environment variables. These are deprecated. Please migrate to S3_* naming convention."
129            );
130        }
131
132        Some(Self {
133            bucket,
134            prefix,
135            endpoint,
136            access_key,
137            secret_key,
138            region,
139            stub_mode,
140        })
141    }
142}
143
144/// S3/R2 sink for event stream archival
145pub struct S3Sink {
146    client: Client,
147    config: S3SinkConfig,
148}
149
150impl S3Sink {
151    #[allow(dead_code)]
152    /// Create a new S3 sink
153    pub async fn new(
154        bucket: String,
155        prefix: String,
156        endpoint: Option<String>,
157        access_key: Option<String>,
158        secret_key: Option<String>,
159    ) -> Result<Self, S3SinkError> {
160        let config = S3SinkConfig {
161            bucket,
162            prefix,
163            endpoint,
164            access_key,
165            secret_key,
166            region: "us-east-1".to_string(),
167            stub_mode: false,
168        };
169
170        Self::from_config(config).await
171    }
172
173    /// Create S3 sink from configuration
174    pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
175        let client = Client::builder().build().map_err(S3SinkError::Http)?;
176
177        Ok(Self { client, config })
178    }
179
180    /// Create S3 sink from environment variables
181    pub async fn from_env() -> Result<Self, S3SinkError> {
182        let config = S3SinkConfig::from_env().ok_or_else(|| {
183            S3SinkError::MissingConfig("S3_BUCKET (or CODETETHER_S3_BUCKET) not set".to_string())
184        })?;
185
186        Self::from_config(config).await
187    }
188
189    /// Build the S3 endpoint URL
190    fn endpoint_url(&self) -> String {
191        if let Some(ref endpoint) = self.config.endpoint {
192            // Custom endpoint (R2, MinIO, etc.)
193            format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
194        } else {
195            // AWS S3
196            format!(
197                "https://{}.s3.{}.amazonaws.com",
198                self.config.bucket, self.config.region
199            )
200        }
201    }
202
203    /// Generate AWS Authorization header
204    fn generate_aws_header(
205        &self,
206        method: &Method,
207        path: &str,
208        query: &str,
209        content_hash: &str,
210        date: &str,
211    ) -> String {
212        let access_key = self.config.access_key.as_deref().unwrap_or("");
213        let secret_key = self.config.secret_key.as_deref().unwrap_or("");
214
215        let canonical_request = format!(
216            "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
217            method, path, query, "application/octet-stream", content_hash, date
218        );
219
220        // String to sign
221        let canonical_request_hash = sha256_hex(&canonical_request);
222        let credential_scope = format!("{}/{}/s3/aws4_request", &date[..8], self.config.region);
223        let string_to_sign = format!(
224            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
225            date, credential_scope, canonical_request_hash
226        );
227
228        // Signing key
229        let k_date = hmac_sha256(
230            format!("AWS4{}", secret_key).as_bytes(),
231            date[..8].as_bytes(),
232        );
233        let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
234        let k_service = hmac_sha256(&k_region, b"s3");
235        let k_signing = hmac_sha256(&k_service, b"aws4_request");
236
237        // Signature
238        let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
239
240        format!(
241            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
242            access_key, credential_scope, signature
243        )
244    }
245
246    /// Upload a local file to S3/R2
247    pub async fn upload_file(
248        &self,
249        local_path: &PathBuf,
250        session_id: &str,
251    ) -> Result<String, S3SinkError> {
252        let filename = local_path
253            .file_name()
254            .and_then(|n| n.to_str())
255            .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
256
257        // Read file contents
258        let data = tokio::fs::read(local_path).await?;
259
260        // S3 key: prefix/session_id/timestamp-filename
261        let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
262
263        // Upload to S3
264        self.upload_bytes(&data, &s3_key, "application/json").await
265    }
266
267    /// Upload bytes directly to S3/R2
268    pub async fn upload_bytes(
269        &self,
270        data: &[u8],
271        s3_key: &str,
272        _content_type: &str,
273    ) -> Result<String, S3SinkError> {
274        // Stub mode: skip actual HTTP request
275        if self.config.stub_mode {
276            let url = if let Some(ref endpoint) = self.config.endpoint {
277                format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
278            } else {
279                format!("s3://{}/{}", self.config.bucket, s3_key)
280            };
281
282            tracing::debug!(
283                key = %s3_key,
284                bytes = data.len(),
285                "[STUB MODE] Skipping S3 upload"
286            );
287
288            return Ok(url);
289        }
290
291        use std::time::{SystemTime, UNIX_EPOCH};
292
293        let endpoint = self.endpoint_url();
294        let path = format!("/{}", s3_key);
295
296        // Generate AWS Signature V4 headers
297        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
298        let date = chrono::DateTime::from_timestamp(now.as_secs() as i64, 0)
299            .map(|dt| dt.format("%Y%m%dT%H%M%SZ").to_string())
300            .unwrap_or_default();
301        let _date_only = &date[..8];
302
303        let content_hash = sha256_hex_bytes(data);
304
305        let method = Method::PUT;
306        let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
307
308        let mut headers = HeaderMap::new();
309        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
310        headers.insert(
311            "x-amz-date",
312            HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
313        );
314        headers.insert(
315            "x-amz-content-sha256",
316            HeaderValue::from_str(&content_hash)
317                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
318        );
319        headers.insert(
320            AUTHORIZATION,
321            HeaderValue::from_str(&auth_header)
322                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
323        );
324
325        let url = format!("{}{}", endpoint, path);
326
327        let response = self
328            .client
329            .put(&url)
330            .headers(headers)
331            .body(data.to_vec())
332            .send()
333            .await?;
334
335        if response.status().is_success() {
336            let url = if let Some(ref endpoint) = self.config.endpoint {
337                format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
338            } else {
339                format!(
340                    "https://{}.s3.{}.amazonaws.com/{}",
341                    self.config.bucket, self.config.region, s3_key
342                )
343            };
344
345            tracing::info!("Uploaded event stream to S3: {}", url);
346            Ok(url)
347        } else {
348            let status = response.status();
349            let body = response.text().await.unwrap_or_default();
350            Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
351        }
352    }
353
354    /// Check if S3 sink is configured
355    ///
356    /// Returns true if either `CODETETHER_S3_BUCKET` or `S3_BUCKET` is set.
357    pub fn is_configured() -> bool {
358        std::env::var("CODETETHER_S3_BUCKET")
359            .or_else(|_| std::env::var("S3_BUCKET"))
360            .is_ok()
361    }
362
363    #[allow(dead_code)]
364    /// Get the bucket name
365    pub fn bucket_name(&self) -> &str {
366        &self.config.bucket
367    }
368
369    #[allow(dead_code)]
370    /// Get the prefix
371    pub fn prefix(&self) -> &str {
372        &self.config.prefix
373    }
374}
375
376/// Compute SHA256 hex digest
377fn sha256_hex(s: &str) -> String {
378    use std::collections::hash_map::DefaultHasher;
379    use std::hash::{Hash, Hasher};
380
381    let mut hasher = DefaultHasher::new();
382    s.hash(&mut hasher);
383    format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
384}
385
386/// Compute SHA256 hex digest of bytes
387fn sha256_hex_bytes(data: &[u8]) -> String {
388    use sha2::{Digest, Sha256};
389    let mut hasher = Sha256::new();
390    hasher.update(data);
391    hex::encode(hasher.finalize())
392}
393
394/// Compute HMAC-SHA256
395fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
396    use hmac::{Hmac, Mac};
397    type HmacSha256 = Hmac<sha2::Sha256>;
398
399    let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
400    mac.update(data);
401    mac.finalize().into_bytes().to_vec()
402}
403
404/// Event file with S3 archival status
405#[derive(Debug, Clone, Serialize, Deserialize)]
406#[allow(dead_code)]
407pub struct ArchivedEventFile {
408    /// Local path where the file was written
409    pub local_path: PathBuf,
410    /// S3 URL where the file was archived
411    pub s3_url: String,
412    /// Session ID
413    pub session_id: String,
414    /// Byte range of events in this file
415    pub start_offset: u64,
416    pub end_offset: u64,
417    /// When the file was archived
418    pub archived_at: chrono::DateTime<chrono::Utc>,
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use std::sync::{Mutex, OnceLock};
425
426    fn env_lock() -> &'static Mutex<()> {
427        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
428        LOCK.get_or_init(|| Mutex::new(()))
429    }
430
431    // Helper to ensure clean environment before each test
432    fn clean_env() {
433        // Note: set_var/remove_var are unsafe in Rust 2024+, but this is test-only code
434        unsafe {
435            // Remove both legacy CODETETHER_S3_* and new S3_* variables
436            std::env::remove_var("CODETETHER_S3_BUCKET");
437            std::env::remove_var("CODETETHER_S3_PREFIX");
438            std::env::remove_var("CODETETHER_S3_ENDPOINT");
439            std::env::remove_var("CODETETHER_S3_REGION");
440            std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
441            std::env::remove_var("CODETETHER_S3_SECRET_KEY");
442            std::env::remove_var("CODETETHER_S3_STUB_MODE");
443            std::env::remove_var("S3_BUCKET");
444            std::env::remove_var("S3_PREFIX");
445            std::env::remove_var("S3_ENDPOINT");
446            std::env::remove_var("S3_REGION");
447            std::env::remove_var("S3_ACCESS_KEY");
448            std::env::remove_var("S3_SECRET_KEY");
449            std::env::remove_var("S3_STUB_MODE");
450            std::env::remove_var("AWS_REGION");
451        }
452    }
453
454    #[test]
455    fn test_config_from_env() {
456        let _lock = env_lock().lock().expect("env lock");
457        // Always start clean
458        clean_env();
459
460        unsafe {
461            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
462            std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
463            std::env::set_var(
464                "CODETETHER_S3_ENDPOINT",
465                "https://test.r2.cloudflarestorage.com",
466            );
467        }
468
469        let config = S3SinkConfig::from_env();
470        assert!(config.is_some());
471
472        let cfg = config.unwrap();
473        assert_eq!(cfg.bucket, "test-bucket");
474        assert_eq!(cfg.prefix, "audit/");
475        assert_eq!(
476            cfg.endpoint,
477            Some("https://test.r2.cloudflarestorage.com".to_string())
478        );
479
480        // Clean up
481        clean_env();
482    }
483
484    #[test]
485    fn test_config_defaults() {
486        let _lock = env_lock().lock().expect("env lock");
487        clean_env();
488
489        unsafe {
490            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
491        }
492
493        let config = S3SinkConfig::from_env().unwrap();
494        assert_eq!(config.region, "us-east-1");
495
496        clean_env();
497    }
498
499    #[test]
500    fn test_is_configured() {
501        let _lock = env_lock().lock().expect("env lock");
502        // Always start clean - remove the var first to ensure isolation
503        clean_env();
504
505        let is_not_configured = !S3Sink::is_configured();
506        assert!(
507            is_not_configured,
508            "S3 sink should not be configured by default"
509        );
510
511        unsafe {
512            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
513        }
514        assert!(
515            S3Sink::is_configured(),
516            "S3 sink should be configured when bucket is set"
517        );
518
519        clean_env();
520    }
521}