Skip to main content

codetether_agent/event_stream/
s3_sink.rs

1//! S3/R2 Object Storage Sink for Event Streams
2//!
3//! Automatically archives JSONL event stream files to S3-compatible storage
4//! (AWS S3, Cloudflare R2, MinIO, etc.) for immutable, tamper-proof archival.
5//!
6//! This is essential for compliance: SOC 2, FedRAMP, ATO processes require
7//! permanent, unmodifiable records of AI agent actions.
8//!
9//! ## Usage
10//!
11//! ```rust,ignore
12//! use codetether_agent::event_stream::s3_sink::S3Sink;
13//!
14//! let sink = S3Sink::new(
15//!     "audit-logs-bucket".to_string(),
16//!     "events/".to_string(),
17//!     "https://account-id.r2.cloudflarestorage.com".to_string(),
18//!     "access-key".to_string(),
19//!     "secret-key".to_string(),
20//! ).await?;
21//!
22//! // Upload event file to S3/R2
23//! sink.upload_file("/local/path/events.jsonl", "session-id").await?;
24//! ```
25
26use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32/// Errors that can occur during S3 operations
33#[derive(Error, Debug)]
34pub enum S3SinkError {
35    #[error("HTTP error: {0}")]
36    Http(#[from] reqwest::Error),
37
38    #[error("IO error: {0}")]
39    Io(#[from] std::io::Error),
40
41    #[error("Missing S3 configuration: {0}")]
42    MissingConfig(String),
43
44    #[error("Upload failed: {0}")]
45    UploadFailed(String),
46
47    #[error("S3 returned error: {0}")]
48    S3Error(String),
49}
50
51/// Configuration for S3/R2 sink
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54    /// S3 bucket name
55    pub bucket: String,
56    /// Path prefix for uploaded files
57    pub prefix: String,
58    /// S3 endpoint URL (None for AWS S3, Some for R2/MinIO)
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub endpoint: Option<String>,
61    /// S3 access key
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub access_key: Option<String>,
64    /// S3 secret key
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub secret_key: Option<String>,
67    /// Region (default: us-east-1)
68    #[serde(default = "default_region")]
69    pub region: String,
70    /// Stub mode - skip actual HTTP requests when S3 unavailable
71    #[serde(default)]
72    pub stub_mode: bool,
73}
74
75fn default_region() -> String {
76    "us-east-1".to_string()
77}
78
79impl S3SinkConfig {
80    /// Create config from environment variables
81    ///
82    /// Supports both `S3_*` (preferred) and `CODETETHER_S3_*` (legacy) environment variables.
83    /// The `S3_*` variables take precedence over `CODETETHER_S3_*` variables for backwards compatibility.
84    ///
85    /// # Environment Variables
86    /// - `S3_BUCKET` (or `CODETETHER_S3_BUCKET`): S3 bucket name
87    /// - `S3_PREFIX` (or `CODETETHER_S3_PREFIX`): Path prefix for uploads (default: "events/")
88    /// - `S3_ENDPOINT` (or `CODETETHER_S3_ENDPOINT`): Custom endpoint for R2/MinIO
89    /// - `S3_ACCESS_KEY` (or `CODETETHER_S3_ACCESS_KEY`): Access key
90    /// - `S3_SECRET_KEY` (or `CODETETHER_S3_SECRET_KEY`): Secret key
91    /// - `S3_REGION` (or `CODETETHER_S3_REGION`, `AWS_REGION`): Region (default: "us-east-1")
92    /// - `S3_STUB_MODE` (or `CODETETHER_S3_STUB_MODE`): Enable stub mode (skip actual uploads)
93    pub fn from_env() -> Option<Self> {
94        // Try S3_* first (new naming), fall back to CODETETHER_S3_* (legacy)
95        let bucket = std::env::var("S3_BUCKET")
96            .or_else(|_| std::env::var("CODETETHER_S3_BUCKET"))
97            .ok()?;
98
99        let prefix = std::env::var("S3_PREFIX")
100            .or_else(|_| std::env::var("CODETETHER_S3_PREFIX"))
101            .unwrap_or_else(|_| "events/".to_string());
102
103        let endpoint = std::env::var("S3_ENDPOINT")
104            .or_else(|_| std::env::var("CODETETHER_S3_ENDPOINT"))
105            .ok();
106
107        let access_key = std::env::var("S3_ACCESS_KEY")
108            .or_else(|_| std::env::var("CODETETHER_S3_ACCESS_KEY"))
109            .ok();
110
111        let secret_key = std::env::var("S3_SECRET_KEY")
112            .or_else(|_| std::env::var("CODETETHER_S3_SECRET_KEY"))
113            .ok();
114
115        let region = std::env::var("S3_REGION")
116            .or_else(|_| std::env::var("CODETETHER_S3_REGION"))
117            .or_else(|_| std::env::var("AWS_REGION"))
118            .unwrap_or_else(|_| "us-east-1".to_string());
119
120        let stub_mode = std::env::var("S3_STUB_MODE")
121            .or_else(|_| std::env::var("CODETETHER_S3_STUB_MODE"))
122            .map(|v| v.to_lowercase() == "true" || v == "1")
123            .unwrap_or(false);
124
125        // Log deprecation warning when using legacy CODETETHER_S3_* variables
126        if std::env::var("CODETETHER_S3_BUCKET").is_ok() && std::env::var("S3_BUCKET").is_err() {
127            tracing::warn!(
128                "Using legacy CODETETHER_S3_* environment variables. These are deprecated. Please migrate to S3_* naming convention."
129            );
130        }
131
132        Some(Self {
133            bucket,
134            prefix,
135            endpoint,
136            access_key,
137            secret_key,
138            region,
139            stub_mode,
140        })
141    }
142}
143
144/// S3/R2 sink for event stream archival
145pub struct S3Sink {
146    client: Client,
147    config: S3SinkConfig,
148}
149
150impl S3Sink {
151    #[allow(dead_code)]
152    /// Create a new S3 sink
153    pub async fn new(
154        bucket: String,
155        prefix: String,
156        endpoint: Option<String>,
157        access_key: Option<String>,
158        secret_key: Option<String>,
159    ) -> Result<Self, S3SinkError> {
160        let config = S3SinkConfig {
161            bucket,
162            prefix,
163            endpoint,
164            access_key,
165            secret_key,
166            region: "us-east-1".to_string(),
167            stub_mode: false,
168        };
169
170        Self::from_config(config).await
171    }
172
173    /// Create S3 sink from configuration
174    pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
175        let client = Client::builder().build().map_err(S3SinkError::Http)?;
176
177        Ok(Self { client, config })
178    }
179
180    /// Create S3 sink from environment variables
181    pub async fn from_env() -> Result<Self, S3SinkError> {
182        let config = S3SinkConfig::from_env().ok_or_else(|| {
183            S3SinkError::MissingConfig("S3_BUCKET (or CODETETHER_S3_BUCKET) not set".to_string())
184        })?;
185
186        Self::from_config(config).await
187    }
188
189    /// Build the S3 endpoint URL
190    fn endpoint_url(&self) -> String {
191        if let Some(ref endpoint) = self.config.endpoint {
192            // Custom endpoint (R2, MinIO, etc.)
193            format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
194        } else {
195            // AWS S3
196            format!(
197                "https://{}.s3.{}.amazonaws.com",
198                self.config.bucket, self.config.region
199            )
200        }
201    }
202
203    /// Generate AWS Authorization header
204    fn generate_aws_header(
205        &self,
206        method: &Method,
207        path: &str,
208        query: &str,
209        content_hash: &str,
210        date: &str,
211    ) -> String {
212        let access_key = self.config.access_key.as_deref().unwrap_or("");
213        let secret_key = self.config.secret_key.as_deref().unwrap_or("");
214
215        let canonical_request = format!(
216            "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
217            method, path, query, "application/octet-stream", content_hash, date
218        );
219
220        // String to sign
221        let canonical_request_hash = sha256_hex(&canonical_request);
222        let credential_scope = format!("{}/{}/s3/aws4_request", &date[..8], self.config.region);
223        let string_to_sign = format!(
224            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
225            date, credential_scope, canonical_request_hash
226        );
227
228        // Signing key
229        let k_date = hmac_sha256(
230            format!("AWS4{}", secret_key).as_bytes(),
231            date[..8].as_bytes(),
232        );
233        let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
234        let k_service = hmac_sha256(&k_region, b"s3");
235        let k_signing = hmac_sha256(&k_service, b"aws4_request");
236
237        // Signature
238        let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
239
240        format!(
241            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
242            access_key, credential_scope, signature
243        )
244    }
245
246    /// Upload a local file to S3/R2
247    pub async fn upload_file(
248        &self,
249        local_path: &PathBuf,
250        session_id: &str,
251    ) -> Result<String, S3SinkError> {
252        let filename = local_path
253            .file_name()
254            .and_then(|n| n.to_str())
255            .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
256
257        // Read file contents
258        let data = tokio::fs::read(local_path).await?;
259
260        // S3 key: prefix/session_id/timestamp-filename
261        let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
262
263        // Upload to S3
264        self.upload_bytes(&data, &s3_key, "application/json").await
265    }
266
267    /// Upload bytes directly to S3/R2
268    pub async fn upload_bytes(
269        &self,
270        data: &[u8],
271        s3_key: &str,
272        _content_type: &str,
273    ) -> Result<String, S3SinkError> {
274        // Stub mode: skip actual HTTP request
275        if self.config.stub_mode {
276            let url = if let Some(ref endpoint) = self.config.endpoint {
277                format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
278            } else {
279                format!("s3://{}/{}", self.config.bucket, s3_key)
280            };
281
282            tracing::debug!(
283                key = %s3_key,
284                bytes = data.len(),
285                "[STUB MODE] Skipping S3 upload"
286            );
287
288            return Ok(url);
289        }
290
291        let endpoint = self.endpoint_url();
292        let path = format!("/{}", s3_key);
293
294        // Generate AWS Signature V4 headers
295        let now = chrono::Utc::now();
296        let date = now.format("%Y%m%dT%H%M%SZ").to_string();
297
298        let content_hash = sha256_hex_bytes(data);
299
300        let method = Method::PUT;
301        let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
302
303        let mut headers = HeaderMap::new();
304        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
305        headers.insert(
306            "x-amz-date",
307            HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
308        );
309        headers.insert(
310            "x-amz-content-sha256",
311            HeaderValue::from_str(&content_hash)
312                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
313        );
314        headers.insert(
315            AUTHORIZATION,
316            HeaderValue::from_str(&auth_header)
317                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
318        );
319
320        let url = format!("{}{}", endpoint, path);
321
322        let response = self
323            .client
324            .put(&url)
325            .headers(headers)
326            .body(data.to_vec())
327            .send()
328            .await?;
329
330        if response.status().is_success() {
331            let url = if let Some(ref endpoint) = self.config.endpoint {
332                format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
333            } else {
334                format!(
335                    "https://{}.s3.{}.amazonaws.com/{}",
336                    self.config.bucket, self.config.region, s3_key
337                )
338            };
339
340            tracing::info!("Uploaded event stream to S3: {}", url);
341            Ok(url)
342        } else {
343            let status = response.status();
344            let body = response.text().await.unwrap_or_default();
345            Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
346        }
347    }
348
349    /// Check if S3 sink is configured
350    ///
351    /// Returns true if either `CODETETHER_S3_BUCKET` or `S3_BUCKET` is set.
352    pub fn is_configured() -> bool {
353        std::env::var("CODETETHER_S3_BUCKET")
354            .or_else(|_| std::env::var("S3_BUCKET"))
355            .is_ok()
356    }
357
358    #[allow(dead_code)]
359    /// Get the bucket name
360    pub fn bucket_name(&self) -> &str {
361        &self.config.bucket
362    }
363
364    #[allow(dead_code)]
365    /// Get the prefix
366    pub fn prefix(&self) -> &str {
367        &self.config.prefix
368    }
369}
370
371/// Compute SHA256 hex digest
372fn sha256_hex(s: &str) -> String {
373    use std::collections::hash_map::DefaultHasher;
374    use std::hash::{Hash, Hasher};
375
376    let mut hasher = DefaultHasher::new();
377    s.hash(&mut hasher);
378    format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
379}
380
381/// Compute SHA256 hex digest of bytes
382fn sha256_hex_bytes(data: &[u8]) -> String {
383    use sha2::{Digest, Sha256};
384    let mut hasher = Sha256::new();
385    hasher.update(data);
386    hex::encode(hasher.finalize())
387}
388
389/// Compute HMAC-SHA256
390fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
391    use hmac::{Hmac, Mac};
392    type HmacSha256 = Hmac<sha2::Sha256>;
393
394    let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
395    mac.update(data);
396    mac.finalize().into_bytes().to_vec()
397}
398
399/// Event file with S3 archival status
400#[derive(Debug, Clone, Serialize, Deserialize)]
401#[allow(dead_code)]
402pub struct ArchivedEventFile {
403    /// Local path where the file was written
404    pub local_path: PathBuf,
405    /// S3 URL where the file was archived
406    pub s3_url: String,
407    /// Session ID
408    pub session_id: String,
409    /// Byte range of events in this file
410    pub start_offset: u64,
411    pub end_offset: u64,
412    /// When the file was archived
413    pub archived_at: chrono::DateTime<chrono::Utc>,
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use std::sync::{Mutex, OnceLock};
420
421    fn env_lock() -> &'static Mutex<()> {
422        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
423        LOCK.get_or_init(|| Mutex::new(()))
424    }
425
426    // Helper to ensure clean environment before each test
427    fn clean_env() {
428        // Note: set_var/remove_var are unsafe in Rust 2024+, but this is test-only code
429        unsafe {
430            // Remove both legacy CODETETHER_S3_* and new S3_* variables
431            std::env::remove_var("CODETETHER_S3_BUCKET");
432            std::env::remove_var("CODETETHER_S3_PREFIX");
433            std::env::remove_var("CODETETHER_S3_ENDPOINT");
434            std::env::remove_var("CODETETHER_S3_REGION");
435            std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
436            std::env::remove_var("CODETETHER_S3_SECRET_KEY");
437            std::env::remove_var("CODETETHER_S3_STUB_MODE");
438            std::env::remove_var("S3_BUCKET");
439            std::env::remove_var("S3_PREFIX");
440            std::env::remove_var("S3_ENDPOINT");
441            std::env::remove_var("S3_REGION");
442            std::env::remove_var("S3_ACCESS_KEY");
443            std::env::remove_var("S3_SECRET_KEY");
444            std::env::remove_var("S3_STUB_MODE");
445            std::env::remove_var("AWS_REGION");
446        }
447    }
448
449    #[test]
450    fn test_config_from_env() {
451        let _lock = env_lock().lock().expect("env lock");
452        // Always start clean
453        clean_env();
454
455        unsafe {
456            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
457            std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
458            std::env::set_var(
459                "CODETETHER_S3_ENDPOINT",
460                "https://test.r2.cloudflarestorage.com",
461            );
462        }
463
464        let config = S3SinkConfig::from_env();
465        assert!(config.is_some());
466
467        let cfg = config.unwrap();
468        assert_eq!(cfg.bucket, "test-bucket");
469        assert_eq!(cfg.prefix, "audit/");
470        assert_eq!(
471            cfg.endpoint,
472            Some("https://test.r2.cloudflarestorage.com".to_string())
473        );
474
475        // Clean up
476        clean_env();
477    }
478
479    #[test]
480    fn test_config_defaults() {
481        let _lock = env_lock().lock().expect("env lock");
482        clean_env();
483
484        unsafe {
485            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
486        }
487
488        let config = S3SinkConfig::from_env().unwrap();
489        assert_eq!(config.region, "us-east-1");
490
491        clean_env();
492    }
493
494    #[test]
495    fn test_is_configured() {
496        let _lock = env_lock().lock().expect("env lock");
497        // Always start clean - remove the var first to ensure isolation
498        clean_env();
499
500        let is_not_configured = !S3Sink::is_configured();
501        assert!(
502            is_not_configured,
503            "S3 sink should not be configured by default"
504        );
505
506        unsafe {
507            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
508        }
509        assert!(
510            S3Sink::is_configured(),
511            "S3 sink should be configured when bucket is set"
512        );
513
514        clean_env();
515    }
516}