use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
use reqwest::{Client, Method};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum S3SinkError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Missing S3 configuration: {0}")]
MissingConfig(String),
#[error("Upload failed: {0}")]
UploadFailed(String),
#[error("S3 returned error: {0}")]
S3Error(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct S3SinkConfig {
pub bucket: String,
pub prefix: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub access_key: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub secret_key: Option<String>,
#[serde(default = "default_region")]
pub region: String,
}
fn default_region() -> String {
"us-east-1".to_string()
}
impl S3SinkConfig {
pub fn from_env() -> Option<Self> {
let bucket = std::env::var("CODETETHER_S3_BUCKET").ok()?;
Some(Self {
bucket,
prefix: std::env::var("CODETETHER_S3_PREFIX").unwrap_or_else(|_| "events/".to_string()),
endpoint: std::env::var("CODETETHER_S3_ENDPOINT").ok(),
access_key: std::env::var("CODETETHER_S3_ACCESS_KEY").ok(),
secret_key: std::env::var("CODETETHER_S3_SECRET_KEY").ok(),
region: std::env::var("CODETETHER_S3_REGION")
.unwrap_or_else(|_| "us-east-1".to_string()),
})
}
}
pub struct S3Sink {
client: Client,
config: S3SinkConfig,
}
impl S3Sink {
pub async fn new(
bucket: String,
prefix: String,
endpoint: Option<String>,
access_key: Option<String>,
secret_key: Option<String>,
) -> Result<Self, S3SinkError> {
let config = S3SinkConfig {
bucket,
prefix,
endpoint,
access_key,
secret_key,
region: "us-east-1".to_string(),
};
Self::from_config(config).await
}
pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
let client = Client::builder()
.build()
.map_err(|e| S3SinkError::Http(e))?;
Ok(Self { client, config })
}
pub async fn from_env() -> Result<Self, S3SinkError> {
let config = S3SinkConfig::from_env().ok_or_else(|| {
S3SinkError::MissingConfig("CODETETHER_S3_BUCKET not set".to_string())
})?;
Self::from_config(config).await
}
fn endpoint_url(&self) -> String {
if let Some(ref endpoint) = self.config.endpoint {
format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
} else {
format!(
"https://{}.s3.{}.amazonaws.com",
self.config.bucket, self.config.region
)
}
}
fn generate_aws_header(
&self,
method: &Method,
path: &str,
query: &str,
content_hash: &str,
date: &str,
) -> String {
let access_key = self.config.access_key.as_deref().unwrap_or("");
let secret_key = self.config.secret_key.as_deref().unwrap_or("");
let canonical_request = format!(
"{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
method, path, query, "application/octet-stream", content_hash, date
);
let canonical_request_hash = sha256_hex(&canonical_request);
let credential_scope = format!(
"{}/{}/s3/aws4_request",
date[..8].to_string(),
self.config.region
);
let string_to_sign = format!(
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
date, credential_scope, canonical_request_hash
);
let k_date = hmac_sha256(
format!("AWS4{}", secret_key).as_bytes(),
date[..8].as_bytes(),
);
let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
let k_service = hmac_sha256(&k_region, b"s3");
let k_signing = hmac_sha256(&k_service, b"aws4_request");
let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
format!(
"AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
access_key, credential_scope, signature
)
}
pub async fn upload_file(
&self,
local_path: &PathBuf,
session_id: &str,
) -> Result<String, S3SinkError> {
let filename = local_path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
let data = tokio::fs::read(local_path).await?;
let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
self.upload_bytes(&data, &s3_key, "application/json").await
}
pub async fn upload_bytes(
&self,
data: &[u8],
s3_key: &str,
_content_type: &str,
) -> Result<String, S3SinkError> {
use std::time::{SystemTime, UNIX_EPOCH};
let endpoint = self.endpoint_url();
let path = format!("/{}", s3_key);
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let date = chrono::DateTime::from_timestamp(now.as_secs() as i64, 0)
.map(|dt| dt.format("%Y%m%dT%H%M%SZ").to_string())
.unwrap_or_default();
let _date_only = &date[..8];
let content_hash = sha256_hex_bytes(data);
let method = Method::PUT;
let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert(
"x-amz-date",
HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
);
headers.insert(
"x-amz-content-sha256",
HeaderValue::from_str(&content_hash)
.map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
);
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&auth_header)
.map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
);
let url = format!("{}{}", endpoint, path);
let response = self
.client
.put(&url)
.headers(headers)
.body(data.to_vec())
.send()
.await?;
if response.status().is_success() {
let url = if let Some(ref endpoint) = self.config.endpoint {
format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
} else {
format!(
"https://{}.s3.{}.amazonaws.com/{}",
self.config.bucket, self.config.region, s3_key
)
};
tracing::info!("Uploaded event stream to S3: {}", url);
Ok(url)
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
}
}
pub fn is_configured() -> bool {
std::env::var("CODETETHER_S3_BUCKET").is_ok()
}
pub fn bucket_name(&self) -> &str {
&self.config.bucket
}
pub fn prefix(&self) -> &str {
&self.config.prefix
}
}
fn sha256_hex(s: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
}
fn sha256_hex_bytes(data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
use hmac::{Hmac, Mac};
type HmacSha256 = Hmac<sha2::Sha256>;
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
mac.update(data);
mac.finalize().into_bytes().to_vec()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivedEventFile {
pub local_path: PathBuf,
pub s3_url: String,
pub session_id: String,
pub start_offset: u64,
pub end_offset: u64,
pub archived_at: chrono::DateTime<chrono::Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
fn clean_env() {
unsafe {
std::env::remove_var("CODETETHER_S3_BUCKET");
std::env::remove_var("CODETETHER_S3_PREFIX");
std::env::remove_var("CODETETHER_S3_ENDPOINT");
std::env::remove_var("CODETETHER_S3_REGION");
std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
std::env::remove_var("CODETETHER_S3_SECRET_KEY");
}
}
#[test]
fn test_config_from_env() {
clean_env();
unsafe {
std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
std::env::set_var(
"CODETETHER_S3_ENDPOINT",
"https://test.r2.cloudflarestorage.com",
);
}
let config = S3SinkConfig::from_env();
assert!(config.is_some());
let cfg = config.unwrap();
assert_eq!(cfg.bucket, "test-bucket");
assert_eq!(cfg.prefix, "audit/");
assert_eq!(
cfg.endpoint,
Some("https://test.r2.cloudflarestorage.com".to_string())
);
clean_env();
}
#[test]
fn test_config_defaults() {
clean_env();
unsafe {
std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
}
let config = S3SinkConfig::from_env().unwrap();
assert_eq!(config.region, "us-east-1");
clean_env();
}
#[test]
fn test_is_configured() {
clean_env();
let is_not_configured = !S3Sink::is_configured();
assert!(
is_not_configured,
"S3 sink should not be configured by default"
);
unsafe {
std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
}
assert!(
S3Sink::is_configured(),
"S3 sink should be configured when bucket is set"
);
clean_env();
}
}