codetether_agent/event_stream/
s3_sink.rs1use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32#[derive(Error, Debug)]
34pub enum S3SinkError {
35 #[error("HTTP error: {0}")]
36 Http(#[from] reqwest::Error),
37
38 #[error("IO error: {0}")]
39 Io(#[from] std::io::Error),
40
41 #[error("Missing S3 configuration: {0}")]
42 MissingConfig(String),
43
44 #[error("Upload failed: {0}")]
45 UploadFailed(String),
46
47 #[error("S3 returned error: {0}")]
48 S3Error(String),
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54 pub bucket: String,
56 pub prefix: String,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub endpoint: Option<String>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub access_key: Option<String>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub secret_key: Option<String>,
67 #[serde(default = "default_region")]
69 pub region: String,
70}
71
72fn default_region() -> String {
73 "us-east-1".to_string()
74}
75
76impl S3SinkConfig {
77 pub fn from_env() -> Option<Self> {
79 let bucket = std::env::var("CODETETHER_S3_BUCKET").ok()?;
80
81 Some(Self {
82 bucket,
83 prefix: std::env::var("CODETETHER_S3_PREFIX").unwrap_or_else(|_| "events/".to_string()),
84 endpoint: std::env::var("CODETETHER_S3_ENDPOINT").ok(),
85 access_key: std::env::var("CODETETHER_S3_ACCESS_KEY").ok(),
86 secret_key: std::env::var("CODETETHER_S3_SECRET_KEY").ok(),
87 region: std::env::var("CODETETHER_S3_REGION")
88 .unwrap_or_else(|_| "us-east-1".to_string()),
89 })
90 }
91}
92
93pub struct S3Sink {
95 client: Client,
96 config: S3SinkConfig,
97}
98
99impl S3Sink {
100 pub async fn new(
102 bucket: String,
103 prefix: String,
104 endpoint: Option<String>,
105 access_key: Option<String>,
106 secret_key: Option<String>,
107 ) -> Result<Self, S3SinkError> {
108 let config = S3SinkConfig {
109 bucket,
110 prefix,
111 endpoint,
112 access_key,
113 secret_key,
114 region: "us-east-1".to_string(),
115 };
116
117 Self::from_config(config).await
118 }
119
120 pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
122 let client = Client::builder()
123 .build()
124 .map_err(|e| S3SinkError::Http(e))?;
125
126 Ok(Self { client, config })
127 }
128
129 pub async fn from_env() -> Result<Self, S3SinkError> {
131 let config = S3SinkConfig::from_env().ok_or_else(|| {
132 S3SinkError::MissingConfig("CODETETHER_S3_BUCKET not set".to_string())
133 })?;
134
135 Self::from_config(config).await
136 }
137
138 fn endpoint_url(&self) -> String {
140 if let Some(ref endpoint) = self.config.endpoint {
141 format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
143 } else {
144 format!(
146 "https://{}.s3.{}.amazonaws.com",
147 self.config.bucket, self.config.region
148 )
149 }
150 }
151
152 fn generate_aws_header(
154 &self,
155 method: &Method,
156 path: &str,
157 query: &str,
158 content_hash: &str,
159 date: &str,
160 ) -> String {
161 let access_key = self.config.access_key.as_deref().unwrap_or("");
162 let secret_key = self.config.secret_key.as_deref().unwrap_or("");
163
164 let canonical_request = format!(
165 "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
166 method, path, query, "application/octet-stream", content_hash, date
167 );
168
169 let canonical_request_hash = sha256_hex(&canonical_request);
171 let credential_scope = format!(
172 "{}/{}/s3/aws4_request",
173 date[..8].to_string(),
174 self.config.region
175 );
176 let string_to_sign = format!(
177 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
178 date, credential_scope, canonical_request_hash
179 );
180
181 let k_date = hmac_sha256(
183 format!("AWS4{}", secret_key).as_bytes(),
184 date[..8].as_bytes(),
185 );
186 let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
187 let k_service = hmac_sha256(&k_region, b"s3");
188 let k_signing = hmac_sha256(&k_service, b"aws4_request");
189
190 let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
192
193 format!(
194 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
195 access_key, credential_scope, signature
196 )
197 }
198
199 pub async fn upload_file(
201 &self,
202 local_path: &PathBuf,
203 session_id: &str,
204 ) -> Result<String, S3SinkError> {
205 let filename = local_path
206 .file_name()
207 .and_then(|n| n.to_str())
208 .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
209
210 let data = tokio::fs::read(local_path).await?;
212
213 let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
215
216 self.upload_bytes(&data, &s3_key, "application/json").await
218 }
219
220 pub async fn upload_bytes(
222 &self,
223 data: &[u8],
224 s3_key: &str,
225 _content_type: &str,
226 ) -> Result<String, S3SinkError> {
227 use std::time::{SystemTime, UNIX_EPOCH};
228
229 let endpoint = self.endpoint_url();
230 let path = format!("/{}", s3_key);
231
232 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
234 let date = chrono::DateTime::from_timestamp(now.as_secs() as i64, 0)
235 .map(|dt| dt.format("%Y%m%dT%H%M%SZ").to_string())
236 .unwrap_or_default();
237 let _date_only = &date[..8];
238
239 let content_hash = sha256_hex_bytes(data);
240
241 let method = Method::PUT;
242 let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
243
244 let mut headers = HeaderMap::new();
245 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
246 headers.insert(
247 "x-amz-date",
248 HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
249 );
250 headers.insert(
251 "x-amz-content-sha256",
252 HeaderValue::from_str(&content_hash)
253 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
254 );
255 headers.insert(
256 AUTHORIZATION,
257 HeaderValue::from_str(&auth_header)
258 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
259 );
260
261 let url = format!("{}{}", endpoint, path);
262
263 let response = self
264 .client
265 .put(&url)
266 .headers(headers)
267 .body(data.to_vec())
268 .send()
269 .await?;
270
271 if response.status().is_success() {
272 let url = if let Some(ref endpoint) = self.config.endpoint {
273 format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
274 } else {
275 format!(
276 "https://{}.s3.{}.amazonaws.com/{}",
277 self.config.bucket, self.config.region, s3_key
278 )
279 };
280
281 tracing::info!("Uploaded event stream to S3: {}", url);
282 Ok(url)
283 } else {
284 let status = response.status();
285 let body = response.text().await.unwrap_or_default();
286 Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
287 }
288 }
289
290 pub fn is_configured() -> bool {
292 std::env::var("CODETETHER_S3_BUCKET").is_ok()
293 }
294
295 pub fn bucket_name(&self) -> &str {
297 &self.config.bucket
298 }
299
300 pub fn prefix(&self) -> &str {
302 &self.config.prefix
303 }
304}
305
306fn sha256_hex(s: &str) -> String {
308 use std::collections::hash_map::DefaultHasher;
309 use std::hash::{Hash, Hasher};
310
311 let mut hasher = DefaultHasher::new();
312 s.hash(&mut hasher);
313 format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
314}
315
316fn sha256_hex_bytes(data: &[u8]) -> String {
318 use sha2::{Digest, Sha256};
319 let mut hasher = Sha256::new();
320 hasher.update(data);
321 hex::encode(hasher.finalize())
322}
323
324fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
326 use hmac::{Hmac, Mac};
327 type HmacSha256 = Hmac<sha2::Sha256>;
328
329 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
330 mac.update(data);
331 mac.finalize().into_bytes().to_vec()
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct ArchivedEventFile {
337 pub local_path: PathBuf,
339 pub s3_url: String,
341 pub session_id: String,
343 pub start_offset: u64,
345 pub end_offset: u64,
346 pub archived_at: chrono::DateTime<chrono::Utc>,
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 fn clean_env() {
356 unsafe {
358 std::env::remove_var("CODETETHER_S3_BUCKET");
359 std::env::remove_var("CODETETHER_S3_PREFIX");
360 std::env::remove_var("CODETETHER_S3_ENDPOINT");
361 std::env::remove_var("CODETETHER_S3_REGION");
362 std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
363 std::env::remove_var("CODETETHER_S3_SECRET_KEY");
364 }
365 }
366
367 #[test]
368 fn test_config_from_env() {
369 clean_env();
371
372 unsafe {
373 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
374 std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
375 std::env::set_var(
376 "CODETETHER_S3_ENDPOINT",
377 "https://test.r2.cloudflarestorage.com",
378 );
379 }
380
381 let config = S3SinkConfig::from_env();
382 assert!(config.is_some());
383
384 let cfg = config.unwrap();
385 assert_eq!(cfg.bucket, "test-bucket");
386 assert_eq!(cfg.prefix, "audit/");
387 assert_eq!(
388 cfg.endpoint,
389 Some("https://test.r2.cloudflarestorage.com".to_string())
390 );
391
392 clean_env();
394 }
395
396 #[test]
397 fn test_config_defaults() {
398 clean_env();
399
400 unsafe {
401 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
402 }
403
404 let config = S3SinkConfig::from_env().unwrap();
405 assert_eq!(config.region, "us-east-1");
406
407 clean_env();
408 }
409
410 #[test]
411 fn test_is_configured() {
412 clean_env();
414
415 let is_not_configured = !S3Sink::is_configured();
416 assert!(
417 is_not_configured,
418 "S3 sink should not be configured by default"
419 );
420
421 unsafe {
422 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
423 }
424 assert!(
425 S3Sink::is_configured(),
426 "S3 sink should be configured when bucket is set"
427 );
428
429 clean_env();
430 }
431}