codetether_agent/event_stream/
s3_sink.rs1use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32#[derive(Error, Debug)]
34pub enum S3SinkError {
35 #[error("HTTP error: {0}")]
36 Http(#[from] reqwest::Error),
37
38 #[error("IO error: {0}")]
39 Io(#[from] std::io::Error),
40
41 #[error("Missing S3 configuration: {0}")]
42 MissingConfig(String),
43
44 #[error("Upload failed: {0}")]
45 UploadFailed(String),
46
47 #[error("S3 returned error: {0}")]
48 S3Error(String),
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54 pub bucket: String,
56 pub prefix: String,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub endpoint: Option<String>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub access_key: Option<String>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub secret_key: Option<String>,
67 #[serde(default = "default_region")]
69 pub region: String,
70 #[serde(default)]
72 pub stub_mode: bool,
73}
74
75fn default_region() -> String {
76 "us-east-1".to_string()
77}
78
79impl S3SinkConfig {
80 pub fn from_env() -> Option<Self> {
94 let bucket = std::env::var("S3_BUCKET")
96 .or_else(|_| std::env::var("CODETETHER_S3_BUCKET"))
97 .ok()?;
98
99 let prefix = std::env::var("S3_PREFIX")
100 .or_else(|_| std::env::var("CODETETHER_S3_PREFIX"))
101 .unwrap_or_else(|_| "events/".to_string());
102
103 let endpoint = std::env::var("S3_ENDPOINT")
104 .or_else(|_| std::env::var("CODETETHER_S3_ENDPOINT"))
105 .ok();
106
107 let access_key = std::env::var("S3_ACCESS_KEY")
108 .or_else(|_| std::env::var("CODETETHER_S3_ACCESS_KEY"))
109 .ok();
110
111 let secret_key = std::env::var("S3_SECRET_KEY")
112 .or_else(|_| std::env::var("CODETETHER_S3_SECRET_KEY"))
113 .ok();
114
115 let region = std::env::var("S3_REGION")
116 .or_else(|_| std::env::var("CODETETHER_S3_REGION"))
117 .or_else(|_| std::env::var("AWS_REGION"))
118 .unwrap_or_else(|_| "us-east-1".to_string());
119
120 let stub_mode = std::env::var("S3_STUB_MODE")
121 .or_else(|_| std::env::var("CODETETHER_S3_STUB_MODE"))
122 .map(|v| v.to_lowercase() == "true" || v == "1")
123 .unwrap_or(false);
124
125 if std::env::var("CODETETHER_S3_BUCKET").is_ok() && std::env::var("S3_BUCKET").is_err() {
127 tracing::warn!(
128 "Using legacy CODETETHER_S3_* environment variables. These are deprecated. Please migrate to S3_* naming convention."
129 );
130 }
131
132 Some(Self {
133 bucket,
134 prefix,
135 endpoint,
136 access_key,
137 secret_key,
138 region,
139 stub_mode,
140 })
141 }
142}
143
144pub struct S3Sink {
146 client: Client,
147 config: S3SinkConfig,
148}
149
150impl S3Sink {
151 #[allow(dead_code)]
152 pub async fn new(
154 bucket: String,
155 prefix: String,
156 endpoint: Option<String>,
157 access_key: Option<String>,
158 secret_key: Option<String>,
159 ) -> Result<Self, S3SinkError> {
160 let config = S3SinkConfig {
161 bucket,
162 prefix,
163 endpoint,
164 access_key,
165 secret_key,
166 region: "us-east-1".to_string(),
167 stub_mode: false,
168 };
169
170 Self::from_config(config).await
171 }
172
173 pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
175 let client = Client::builder().build().map_err(S3SinkError::Http)?;
176
177 Ok(Self { client, config })
178 }
179
180 pub async fn from_env() -> Result<Self, S3SinkError> {
182 let config = S3SinkConfig::from_env().ok_or_else(|| {
183 S3SinkError::MissingConfig("S3_BUCKET (or CODETETHER_S3_BUCKET) not set".to_string())
184 })?;
185
186 Self::from_config(config).await
187 }
188
189 fn endpoint_url(&self) -> String {
191 if let Some(ref endpoint) = self.config.endpoint {
192 format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
194 } else {
195 format!(
197 "https://{}.s3.{}.amazonaws.com",
198 self.config.bucket, self.config.region
199 )
200 }
201 }
202
203 fn generate_aws_header(
205 &self,
206 method: &Method,
207 path: &str,
208 query: &str,
209 content_hash: &str,
210 date: &str,
211 ) -> String {
212 let access_key = self.config.access_key.as_deref().unwrap_or("");
213 let secret_key = self.config.secret_key.as_deref().unwrap_or("");
214
215 let canonical_request = format!(
216 "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
217 method, path, query, "application/octet-stream", content_hash, date
218 );
219
220 let canonical_request_hash = sha256_hex(&canonical_request);
222 let credential_scope = format!("{}/{}/s3/aws4_request", &date[..8], self.config.region);
223 let string_to_sign = format!(
224 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
225 date, credential_scope, canonical_request_hash
226 );
227
228 let k_date = hmac_sha256(
230 format!("AWS4{}", secret_key).as_bytes(),
231 date[..8].as_bytes(),
232 );
233 let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
234 let k_service = hmac_sha256(&k_region, b"s3");
235 let k_signing = hmac_sha256(&k_service, b"aws4_request");
236
237 let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
239
240 format!(
241 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
242 access_key, credential_scope, signature
243 )
244 }
245
246 pub async fn upload_file(
248 &self,
249 local_path: &PathBuf,
250 session_id: &str,
251 ) -> Result<String, S3SinkError> {
252 let filename = local_path
253 .file_name()
254 .and_then(|n| n.to_str())
255 .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
256
257 let data = tokio::fs::read(local_path).await?;
259
260 let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
262
263 self.upload_bytes(&data, &s3_key, "application/json").await
265 }
266
267 pub async fn upload_bytes(
269 &self,
270 data: &[u8],
271 s3_key: &str,
272 _content_type: &str,
273 ) -> Result<String, S3SinkError> {
274 if self.config.stub_mode {
276 let url = if let Some(ref endpoint) = self.config.endpoint {
277 format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
278 } else {
279 format!("s3://{}/{}", self.config.bucket, s3_key)
280 };
281
282 tracing::debug!(
283 key = %s3_key,
284 bytes = data.len(),
285 "[STUB MODE] Skipping S3 upload"
286 );
287
288 return Ok(url);
289 }
290
291 let endpoint = self.endpoint_url();
292 let path = format!("/{}", s3_key);
293
294 let now = chrono::Utc::now();
296 let date = now.format("%Y%m%dT%H%M%SZ").to_string();
297
298 let content_hash = sha256_hex_bytes(data);
299
300 let method = Method::PUT;
301 let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
302
303 let mut headers = HeaderMap::new();
304 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
305 headers.insert(
306 "x-amz-date",
307 HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
308 );
309 headers.insert(
310 "x-amz-content-sha256",
311 HeaderValue::from_str(&content_hash)
312 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
313 );
314 headers.insert(
315 AUTHORIZATION,
316 HeaderValue::from_str(&auth_header)
317 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
318 );
319
320 let url = format!("{}{}", endpoint, path);
321
322 let response = self
323 .client
324 .put(&url)
325 .headers(headers)
326 .body(data.to_vec())
327 .send()
328 .await?;
329
330 if response.status().is_success() {
331 let url = if let Some(ref endpoint) = self.config.endpoint {
332 format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
333 } else {
334 format!(
335 "https://{}.s3.{}.amazonaws.com/{}",
336 self.config.bucket, self.config.region, s3_key
337 )
338 };
339
340 tracing::info!("Uploaded event stream to S3: {}", url);
341 Ok(url)
342 } else {
343 let status = response.status();
344 let body = response.text().await.unwrap_or_default();
345 Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
346 }
347 }
348
349 pub fn is_configured() -> bool {
353 std::env::var("CODETETHER_S3_BUCKET")
354 .or_else(|_| std::env::var("S3_BUCKET"))
355 .is_ok()
356 }
357
358 #[allow(dead_code)]
359 pub fn bucket_name(&self) -> &str {
361 &self.config.bucket
362 }
363
364 #[allow(dead_code)]
365 pub fn prefix(&self) -> &str {
367 &self.config.prefix
368 }
369}
370
371fn sha256_hex(s: &str) -> String {
373 use std::collections::hash_map::DefaultHasher;
374 use std::hash::{Hash, Hasher};
375
376 let mut hasher = DefaultHasher::new();
377 s.hash(&mut hasher);
378 format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
379}
380
381fn sha256_hex_bytes(data: &[u8]) -> String {
383 use sha2::{Digest, Sha256};
384 let mut hasher = Sha256::new();
385 hasher.update(data);
386 hex::encode(hasher.finalize())
387}
388
389fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
391 use hmac::{Hmac, Mac};
392 type HmacSha256 = Hmac<sha2::Sha256>;
393
394 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
395 mac.update(data);
396 mac.finalize().into_bytes().to_vec()
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
401#[allow(dead_code)]
402pub struct ArchivedEventFile {
403 pub local_path: PathBuf,
405 pub s3_url: String,
407 pub session_id: String,
409 pub start_offset: u64,
411 pub end_offset: u64,
412 pub archived_at: chrono::DateTime<chrono::Utc>,
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use std::sync::{Mutex, OnceLock};
420
421 fn env_lock() -> &'static Mutex<()> {
422 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
423 LOCK.get_or_init(|| Mutex::new(()))
424 }
425
426 fn clean_env() {
428 unsafe {
430 std::env::remove_var("CODETETHER_S3_BUCKET");
432 std::env::remove_var("CODETETHER_S3_PREFIX");
433 std::env::remove_var("CODETETHER_S3_ENDPOINT");
434 std::env::remove_var("CODETETHER_S3_REGION");
435 std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
436 std::env::remove_var("CODETETHER_S3_SECRET_KEY");
437 std::env::remove_var("CODETETHER_S3_STUB_MODE");
438 std::env::remove_var("S3_BUCKET");
439 std::env::remove_var("S3_PREFIX");
440 std::env::remove_var("S3_ENDPOINT");
441 std::env::remove_var("S3_REGION");
442 std::env::remove_var("S3_ACCESS_KEY");
443 std::env::remove_var("S3_SECRET_KEY");
444 std::env::remove_var("S3_STUB_MODE");
445 std::env::remove_var("AWS_REGION");
446 }
447 }
448
449 #[test]
450 fn test_config_from_env() {
451 let _lock = env_lock().lock().expect("env lock");
452 clean_env();
454
455 unsafe {
456 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
457 std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
458 std::env::set_var(
459 "CODETETHER_S3_ENDPOINT",
460 "https://test.r2.cloudflarestorage.com",
461 );
462 }
463
464 let config = S3SinkConfig::from_env();
465 assert!(config.is_some());
466
467 let cfg = config.unwrap();
468 assert_eq!(cfg.bucket, "test-bucket");
469 assert_eq!(cfg.prefix, "audit/");
470 assert_eq!(
471 cfg.endpoint,
472 Some("https://test.r2.cloudflarestorage.com".to_string())
473 );
474
475 clean_env();
477 }
478
479 #[test]
480 fn test_config_defaults() {
481 let _lock = env_lock().lock().expect("env lock");
482 clean_env();
483
484 unsafe {
485 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
486 }
487
488 let config = S3SinkConfig::from_env().unwrap();
489 assert_eq!(config.region, "us-east-1");
490
491 clean_env();
492 }
493
494 #[test]
495 fn test_is_configured() {
496 let _lock = env_lock().lock().expect("env lock");
497 clean_env();
499
500 let is_not_configured = !S3Sink::is_configured();
501 assert!(
502 is_not_configured,
503 "S3 sink should not be configured by default"
504 );
505
506 unsafe {
507 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
508 }
509 assert!(
510 S3Sink::is_configured(),
511 "S3 sink should be configured when bucket is set"
512 );
513
514 clean_env();
515 }
516}