Skip to main content

codetether_agent/event_stream/
s3_sink.rs

1//! S3/R2 Object Storage Sink for Event Streams
2//!
3//! Automatically archives JSONL event stream files to S3-compatible storage
4//! (AWS S3, Cloudflare R2, MinIO, etc.) for immutable, tamper-proof archival.
5//!
6//! This is essential for compliance: SOC 2, FedRAMP, ATO processes require
7//! permanent, unmodifiable records of AI agent actions.
8//!
9//! ## Usage
10//!
11//! ```rust,ignore
12//! use codetether_agent::event_stream::s3_sink::S3Sink;
13//!
14//! let sink = S3Sink::new(
15//!     "audit-logs-bucket".to_string(),
16//!     "events/".to_string(),
17//!     "https://account-id.r2.cloudflarestorage.com".to_string(),
18//!     "access-key".to_string(),
19//!     "secret-key".to_string(),
20//! ).await?;
21//!
22//! // Upload event file to S3/R2
23//! sink.upload_file("/local/path/events.jsonl", "session-id").await?;
24//! ```
25
26use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32/// Errors that can occur during S3 operations
33#[derive(Error, Debug)]
34pub enum S3SinkError {
35    #[error("HTTP error: {0}")]
36    Http(#[from] reqwest::Error),
37
38    #[error("IO error: {0}")]
39    Io(#[from] std::io::Error),
40
41    #[error("Missing S3 configuration: {0}")]
42    MissingConfig(String),
43
44    #[error("Upload failed: {0}")]
45    UploadFailed(String),
46
47    #[error("S3 returned error: {0}")]
48    S3Error(String),
49}
50
51/// Configuration for S3/R2 sink
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54    /// S3 bucket name
55    pub bucket: String,
56    /// Path prefix for uploaded files
57    pub prefix: String,
58    /// S3 endpoint URL (None for AWS S3, Some for R2/MinIO)
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub endpoint: Option<String>,
61    /// S3 access key
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub access_key: Option<String>,
64    /// S3 secret key
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub secret_key: Option<String>,
67    /// Region (default: us-east-1)
68    #[serde(default = "default_region")]
69    pub region: String,
70}
71
72fn default_region() -> String {
73    "us-east-1".to_string()
74}
75
76impl S3SinkConfig {
77    /// Create config from environment variables
78    pub fn from_env() -> Option<Self> {
79        let bucket = std::env::var("CODETETHER_S3_BUCKET").ok()?;
80
81        Some(Self {
82            bucket,
83            prefix: std::env::var("CODETETHER_S3_PREFIX").unwrap_or_else(|_| "events/".to_string()),
84            endpoint: std::env::var("CODETETHER_S3_ENDPOINT").ok(),
85            access_key: std::env::var("CODETETHER_S3_ACCESS_KEY").ok(),
86            secret_key: std::env::var("CODETETHER_S3_SECRET_KEY").ok(),
87            region: std::env::var("CODETETHER_S3_REGION")
88                .unwrap_or_else(|_| "us-east-1".to_string()),
89        })
90    }
91}
92
93/// S3/R2 sink for event stream archival
94pub struct S3Sink {
95    client: Client,
96    config: S3SinkConfig,
97}
98
99impl S3Sink {
100    /// Create a new S3 sink
101    pub async fn new(
102        bucket: String,
103        prefix: String,
104        endpoint: Option<String>,
105        access_key: Option<String>,
106        secret_key: Option<String>,
107    ) -> Result<Self, S3SinkError> {
108        let config = S3SinkConfig {
109            bucket,
110            prefix,
111            endpoint,
112            access_key,
113            secret_key,
114            region: "us-east-1".to_string(),
115        };
116
117        Self::from_config(config).await
118    }
119
120    /// Create S3 sink from configuration
121    pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
122        let client = Client::builder()
123            .build()
124            .map_err(|e| S3SinkError::Http(e))?;
125
126        Ok(Self { client, config })
127    }
128
129    /// Create S3 sink from environment variables
130    pub async fn from_env() -> Result<Self, S3SinkError> {
131        let config = S3SinkConfig::from_env().ok_or_else(|| {
132            S3SinkError::MissingConfig("CODETETHER_S3_BUCKET not set".to_string())
133        })?;
134
135        Self::from_config(config).await
136    }
137
138    /// Build the S3 endpoint URL
139    fn endpoint_url(&self) -> String {
140        if let Some(ref endpoint) = self.config.endpoint {
141            // Custom endpoint (R2, MinIO, etc.)
142            format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
143        } else {
144            // AWS S3
145            format!(
146                "https://{}.s3.{}.amazonaws.com",
147                self.config.bucket, self.config.region
148            )
149        }
150    }
151
152    /// Generate AWS Authorization header
153    fn generate_aws_header(
154        &self,
155        method: &Method,
156        path: &str,
157        query: &str,
158        content_hash: &str,
159        date: &str,
160    ) -> String {
161        let access_key = self.config.access_key.as_deref().unwrap_or("");
162        let secret_key = self.config.secret_key.as_deref().unwrap_or("");
163
164        let canonical_request = format!(
165            "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
166            method, path, query, "application/octet-stream", content_hash, date
167        );
168
169        // String to sign
170        let canonical_request_hash = sha256_hex(&canonical_request);
171        let credential_scope = format!(
172            "{}/{}/s3/aws4_request",
173            date[..8].to_string(),
174            self.config.region
175        );
176        let string_to_sign = format!(
177            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
178            date, credential_scope, canonical_request_hash
179        );
180
181        // Signing key
182        let k_date = hmac_sha256(
183            format!("AWS4{}", secret_key).as_bytes(),
184            date[..8].as_bytes(),
185        );
186        let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
187        let k_service = hmac_sha256(&k_region, b"s3");
188        let k_signing = hmac_sha256(&k_service, b"aws4_request");
189
190        // Signature
191        let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
192
193        format!(
194            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
195            access_key, credential_scope, signature
196        )
197    }
198
199    /// Upload a local file to S3/R2
200    pub async fn upload_file(
201        &self,
202        local_path: &PathBuf,
203        session_id: &str,
204    ) -> Result<String, S3SinkError> {
205        let filename = local_path
206            .file_name()
207            .and_then(|n| n.to_str())
208            .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
209
210        // Read file contents
211        let data = tokio::fs::read(local_path).await?;
212
213        // S3 key: prefix/session_id/timestamp-filename
214        let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
215
216        // Upload to S3
217        self.upload_bytes(&data, &s3_key, "application/json").await
218    }
219
220    /// Upload bytes directly to S3/R2
221    pub async fn upload_bytes(
222        &self,
223        data: &[u8],
224        s3_key: &str,
225        _content_type: &str,
226    ) -> Result<String, S3SinkError> {
227        use std::time::{SystemTime, UNIX_EPOCH};
228
229        let endpoint = self.endpoint_url();
230        let path = format!("/{}", s3_key);
231
232        // Generate AWS Signature V4 headers
233        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
234        let date = chrono::DateTime::from_timestamp(now.as_secs() as i64, 0)
235            .map(|dt| dt.format("%Y%m%dT%H%M%SZ").to_string())
236            .unwrap_or_default();
237        let _date_only = &date[..8];
238
239        let content_hash = sha256_hex_bytes(data);
240
241        let method = Method::PUT;
242        let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
243
244        let mut headers = HeaderMap::new();
245        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
246        headers.insert(
247            "x-amz-date",
248            HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
249        );
250        headers.insert(
251            "x-amz-content-sha256",
252            HeaderValue::from_str(&content_hash)
253                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
254        );
255        headers.insert(
256            AUTHORIZATION,
257            HeaderValue::from_str(&auth_header)
258                .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
259        );
260
261        let url = format!("{}{}", endpoint, path);
262
263        let response = self
264            .client
265            .put(&url)
266            .headers(headers)
267            .body(data.to_vec())
268            .send()
269            .await?;
270
271        if response.status().is_success() {
272            let url = if let Some(ref endpoint) = self.config.endpoint {
273                format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
274            } else {
275                format!(
276                    "https://{}.s3.{}.amazonaws.com/{}",
277                    self.config.bucket, self.config.region, s3_key
278                )
279            };
280
281            tracing::info!("Uploaded event stream to S3: {}", url);
282            Ok(url)
283        } else {
284            let status = response.status();
285            let body = response.text().await.unwrap_or_default();
286            Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
287        }
288    }
289
290    /// Check if S3 sink is configured
291    pub fn is_configured() -> bool {
292        std::env::var("CODETETHER_S3_BUCKET").is_ok()
293    }
294
295    /// Get the bucket name
296    pub fn bucket_name(&self) -> &str {
297        &self.config.bucket
298    }
299
300    /// Get the prefix
301    pub fn prefix(&self) -> &str {
302        &self.config.prefix
303    }
304}
305
306/// Compute SHA256 hex digest
307fn sha256_hex(s: &str) -> String {
308    use std::collections::hash_map::DefaultHasher;
309    use std::hash::{Hash, Hasher};
310
311    let mut hasher = DefaultHasher::new();
312    s.hash(&mut hasher);
313    format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
314}
315
316/// Compute SHA256 hex digest of bytes
317fn sha256_hex_bytes(data: &[u8]) -> String {
318    use sha2::{Digest, Sha256};
319    let mut hasher = Sha256::new();
320    hasher.update(data);
321    hex::encode(hasher.finalize())
322}
323
324/// Compute HMAC-SHA256
325fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
326    use hmac::{Hmac, Mac};
327    type HmacSha256 = Hmac<sha2::Sha256>;
328
329    let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
330    mac.update(data);
331    mac.finalize().into_bytes().to_vec()
332}
333
334/// Event file with S3 archival status
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct ArchivedEventFile {
337    /// Local path where the file was written
338    pub local_path: PathBuf,
339    /// S3 URL where the file was archived
340    pub s3_url: String,
341    /// Session ID
342    pub session_id: String,
343    /// Byte range of events in this file
344    pub start_offset: u64,
345    pub end_offset: u64,
346    /// When the file was archived
347    pub archived_at: chrono::DateTime<chrono::Utc>,
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    // Helper to ensure clean environment before each test
355    fn clean_env() {
356        // Note: set_var/remove_var are unsafe in Rust 2024+, but this is test-only code
357        unsafe {
358            std::env::remove_var("CODETETHER_S3_BUCKET");
359            std::env::remove_var("CODETETHER_S3_PREFIX");
360            std::env::remove_var("CODETETHER_S3_ENDPOINT");
361            std::env::remove_var("CODETETHER_S3_REGION");
362            std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
363            std::env::remove_var("CODETETHER_S3_SECRET_KEY");
364        }
365    }
366
367    #[test]
368    fn test_config_from_env() {
369        // Always start clean
370        clean_env();
371
372        unsafe {
373            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
374            std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
375            std::env::set_var(
376                "CODETETHER_S3_ENDPOINT",
377                "https://test.r2.cloudflarestorage.com",
378            );
379        }
380
381        let config = S3SinkConfig::from_env();
382        assert!(config.is_some());
383
384        let cfg = config.unwrap();
385        assert_eq!(cfg.bucket, "test-bucket");
386        assert_eq!(cfg.prefix, "audit/");
387        assert_eq!(
388            cfg.endpoint,
389            Some("https://test.r2.cloudflarestorage.com".to_string())
390        );
391
392        // Clean up
393        clean_env();
394    }
395
396    #[test]
397    fn test_config_defaults() {
398        clean_env();
399
400        unsafe {
401            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
402        }
403
404        let config = S3SinkConfig::from_env().unwrap();
405        assert_eq!(config.region, "us-east-1");
406
407        clean_env();
408    }
409
410    #[test]
411    fn test_is_configured() {
412        // Always start clean - remove the var first to ensure isolation
413        clean_env();
414
415        let is_not_configured = !S3Sink::is_configured();
416        assert!(
417            is_not_configured,
418            "S3 sink should not be configured by default"
419        );
420
421        unsafe {
422            std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
423        }
424        assert!(
425            S3Sink::is_configured(),
426            "S3 sink should be configured when bucket is set"
427        );
428
429        clean_env();
430    }
431}