codetether_agent/event_stream/
s3_sink.rs1use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue};
27use reqwest::{Client, Method};
28use serde::{Deserialize, Serialize};
29use std::path::PathBuf;
30use thiserror::Error;
31
32#[derive(Error, Debug)]
34pub enum S3SinkError {
35 #[error("HTTP error: {0}")]
36 Http(#[from] reqwest::Error),
37
38 #[error("IO error: {0}")]
39 Io(#[from] std::io::Error),
40
41 #[error("Missing S3 configuration: {0}")]
42 MissingConfig(String),
43
44 #[error("Upload failed: {0}")]
45 UploadFailed(String),
46
47 #[error("S3 returned error: {0}")]
48 S3Error(String),
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct S3SinkConfig {
54 pub bucket: String,
56 pub prefix: String,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub endpoint: Option<String>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub access_key: Option<String>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub secret_key: Option<String>,
67 #[serde(default = "default_region")]
69 pub region: String,
70 #[serde(default)]
72 pub stub_mode: bool,
73}
74
75fn default_region() -> String {
76 "us-east-1".to_string()
77}
78
79impl S3SinkConfig {
80 pub fn from_env() -> Option<Self> {
94 let bucket = std::env::var("S3_BUCKET")
96 .or_else(|_| std::env::var("CODETETHER_S3_BUCKET"))
97 .ok()?;
98
99 let prefix = std::env::var("S3_PREFIX")
100 .or_else(|_| std::env::var("CODETETHER_S3_PREFIX"))
101 .unwrap_or_else(|_| "events/".to_string());
102
103 let endpoint = std::env::var("S3_ENDPOINT")
104 .or_else(|_| std::env::var("CODETETHER_S3_ENDPOINT"))
105 .ok();
106
107 let access_key = std::env::var("S3_ACCESS_KEY")
108 .or_else(|_| std::env::var("CODETETHER_S3_ACCESS_KEY"))
109 .ok();
110
111 let secret_key = std::env::var("S3_SECRET_KEY")
112 .or_else(|_| std::env::var("CODETETHER_S3_SECRET_KEY"))
113 .ok();
114
115 let region = std::env::var("S3_REGION")
116 .or_else(|_| std::env::var("CODETETHER_S3_REGION"))
117 .or_else(|_| std::env::var("AWS_REGION"))
118 .unwrap_or_else(|_| "us-east-1".to_string());
119
120 let stub_mode = std::env::var("S3_STUB_MODE")
121 .or_else(|_| std::env::var("CODETETHER_S3_STUB_MODE"))
122 .map(|v| v.to_lowercase() == "true" || v == "1")
123 .unwrap_or(false);
124
125 if std::env::var("CODETETHER_S3_BUCKET").is_ok() && std::env::var("S3_BUCKET").is_err() {
127 tracing::warn!(
128 "Using legacy CODETETHER_S3_* environment variables. These are deprecated. Please migrate to S3_* naming convention."
129 );
130 }
131
132 Some(Self {
133 bucket,
134 prefix,
135 endpoint,
136 access_key,
137 secret_key,
138 region,
139 stub_mode,
140 })
141 }
142}
143
144pub struct S3Sink {
146 client: Client,
147 config: S3SinkConfig,
148}
149
150impl S3Sink {
151 #[allow(dead_code)]
152 pub async fn new(
154 bucket: String,
155 prefix: String,
156 endpoint: Option<String>,
157 access_key: Option<String>,
158 secret_key: Option<String>,
159 ) -> Result<Self, S3SinkError> {
160 let config = S3SinkConfig {
161 bucket,
162 prefix,
163 endpoint,
164 access_key,
165 secret_key,
166 region: "us-east-1".to_string(),
167 stub_mode: false,
168 };
169
170 Self::from_config(config).await
171 }
172
173 pub async fn from_config(config: S3SinkConfig) -> Result<Self, S3SinkError> {
175 let client = Client::builder().build().map_err(S3SinkError::Http)?;
176
177 Ok(Self { client, config })
178 }
179
180 pub async fn from_env() -> Result<Self, S3SinkError> {
182 let config = S3SinkConfig::from_env().ok_or_else(|| {
183 S3SinkError::MissingConfig("S3_BUCKET (or CODETETHER_S3_BUCKET) not set".to_string())
184 })?;
185
186 Self::from_config(config).await
187 }
188
189 fn endpoint_url(&self) -> String {
191 if let Some(ref endpoint) = self.config.endpoint {
192 format!("{}/{}", endpoint.trim_end_matches('/'), self.config.bucket)
194 } else {
195 format!(
197 "https://{}.s3.{}.amazonaws.com",
198 self.config.bucket, self.config.region
199 )
200 }
201 }
202
203 fn generate_aws_header(
205 &self,
206 method: &Method,
207 path: &str,
208 query: &str,
209 content_hash: &str,
210 date: &str,
211 ) -> String {
212 let access_key = self.config.access_key.as_deref().unwrap_or("");
213 let secret_key = self.config.secret_key.as_deref().unwrap_or("");
214
215 let canonical_request = format!(
216 "{}\n{}\n{}\n\ncontent-type:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\ncontent-type;x-amz-content-sha256;x-amz-date",
217 method, path, query, "application/octet-stream", content_hash, date
218 );
219
220 let canonical_request_hash = sha256_hex(&canonical_request);
222 let credential_scope = format!("{}/{}/s3/aws4_request", &date[..8], self.config.region);
223 let string_to_sign = format!(
224 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
225 date, credential_scope, canonical_request_hash
226 );
227
228 let k_date = hmac_sha256(
230 format!("AWS4{}", secret_key).as_bytes(),
231 date[..8].as_bytes(),
232 );
233 let k_region = hmac_sha256(&k_date, self.config.region.as_bytes());
234 let k_service = hmac_sha256(&k_region, b"s3");
235 let k_signing = hmac_sha256(&k_service, b"aws4_request");
236
237 let signature = hex::encode(hmac_sha256(&k_signing, string_to_sign.as_bytes()));
239
240 format!(
241 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders=content-type;x-amz-content-sha256;x-amz-date, Signature={}",
242 access_key, credential_scope, signature
243 )
244 }
245
246 pub async fn upload_file(
248 &self,
249 local_path: &PathBuf,
250 session_id: &str,
251 ) -> Result<String, S3SinkError> {
252 let filename = local_path
253 .file_name()
254 .and_then(|n| n.to_str())
255 .ok_or_else(|| S3SinkError::UploadFailed("Invalid filename".to_string()))?;
256
257 let data = tokio::fs::read(local_path).await?;
259
260 let s3_key = format!("{}{}/{}", self.config.prefix, session_id, filename);
262
263 self.upload_bytes(&data, &s3_key, "application/json").await
265 }
266
267 pub async fn upload_bytes(
269 &self,
270 data: &[u8],
271 s3_key: &str,
272 _content_type: &str,
273 ) -> Result<String, S3SinkError> {
274 if self.config.stub_mode {
276 let url = if let Some(ref endpoint) = self.config.endpoint {
277 format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
278 } else {
279 format!("s3://{}/{}", self.config.bucket, s3_key)
280 };
281
282 tracing::debug!(
283 key = %s3_key,
284 bytes = data.len(),
285 "[STUB MODE] Skipping S3 upload"
286 );
287
288 return Ok(url);
289 }
290
291 use std::time::{SystemTime, UNIX_EPOCH};
292
293 let endpoint = self.endpoint_url();
294 let path = format!("/{}", s3_key);
295
296 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
298 let date = chrono::DateTime::from_timestamp(now.as_secs() as i64, 0)
299 .map(|dt| dt.format("%Y%m%dT%H%M%SZ").to_string())
300 .unwrap_or_default();
301 let _date_only = &date[..8];
302
303 let content_hash = sha256_hex_bytes(data);
304
305 let method = Method::PUT;
306 let auth_header = self.generate_aws_header(&method, &path, "", &content_hash, &date);
307
308 let mut headers = HeaderMap::new();
309 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
310 headers.insert(
311 "x-amz-date",
312 HeaderValue::from_str(&date).map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
313 );
314 headers.insert(
315 "x-amz-content-sha256",
316 HeaderValue::from_str(&content_hash)
317 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
318 );
319 headers.insert(
320 AUTHORIZATION,
321 HeaderValue::from_str(&auth_header)
322 .map_err(|e| S3SinkError::UploadFailed(e.to_string()))?,
323 );
324
325 let url = format!("{}{}", endpoint, path);
326
327 let response = self
328 .client
329 .put(&url)
330 .headers(headers)
331 .body(data.to_vec())
332 .send()
333 .await?;
334
335 if response.status().is_success() {
336 let url = if let Some(ref endpoint) = self.config.endpoint {
337 format!("{}/{}", endpoint.trim_end_matches('/'), s3_key)
338 } else {
339 format!(
340 "https://{}.s3.{}.amazonaws.com/{}",
341 self.config.bucket, self.config.region, s3_key
342 )
343 };
344
345 tracing::info!("Uploaded event stream to S3: {}", url);
346 Ok(url)
347 } else {
348 let status = response.status();
349 let body = response.text().await.unwrap_or_default();
350 Err(S3SinkError::S3Error(format!("{}: {}", status, body)))
351 }
352 }
353
354 pub fn is_configured() -> bool {
358 std::env::var("CODETETHER_S3_BUCKET")
359 .or_else(|_| std::env::var("S3_BUCKET"))
360 .is_ok()
361 }
362
363 #[allow(dead_code)]
364 pub fn bucket_name(&self) -> &str {
366 &self.config.bucket
367 }
368
369 #[allow(dead_code)]
370 pub fn prefix(&self) -> &str {
372 &self.config.prefix
373 }
374}
375
376fn sha256_hex(s: &str) -> String {
378 use std::collections::hash_map::DefaultHasher;
379 use std::hash::{Hash, Hasher};
380
381 let mut hasher = DefaultHasher::new();
382 s.hash(&mut hasher);
383 format!("{:016x}{:016x}", hasher.finish(), hasher.finish())
384}
385
386fn sha256_hex_bytes(data: &[u8]) -> String {
388 use sha2::{Digest, Sha256};
389 let mut hasher = Sha256::new();
390 hasher.update(data);
391 hex::encode(hasher.finalize())
392}
393
394fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
396 use hmac::{Hmac, Mac};
397 type HmacSha256 = Hmac<sha2::Sha256>;
398
399 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
400 mac.update(data);
401 mac.finalize().into_bytes().to_vec()
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406#[allow(dead_code)]
407pub struct ArchivedEventFile {
408 pub local_path: PathBuf,
410 pub s3_url: String,
412 pub session_id: String,
414 pub start_offset: u64,
416 pub end_offset: u64,
417 pub archived_at: chrono::DateTime<chrono::Utc>,
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use std::sync::{Mutex, OnceLock};
425
426 fn env_lock() -> &'static Mutex<()> {
427 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
428 LOCK.get_or_init(|| Mutex::new(()))
429 }
430
431 fn clean_env() {
433 unsafe {
435 std::env::remove_var("CODETETHER_S3_BUCKET");
437 std::env::remove_var("CODETETHER_S3_PREFIX");
438 std::env::remove_var("CODETETHER_S3_ENDPOINT");
439 std::env::remove_var("CODETETHER_S3_REGION");
440 std::env::remove_var("CODETETHER_S3_ACCESS_KEY");
441 std::env::remove_var("CODETETHER_S3_SECRET_KEY");
442 std::env::remove_var("CODETETHER_S3_STUB_MODE");
443 std::env::remove_var("S3_BUCKET");
444 std::env::remove_var("S3_PREFIX");
445 std::env::remove_var("S3_ENDPOINT");
446 std::env::remove_var("S3_REGION");
447 std::env::remove_var("S3_ACCESS_KEY");
448 std::env::remove_var("S3_SECRET_KEY");
449 std::env::remove_var("S3_STUB_MODE");
450 std::env::remove_var("AWS_REGION");
451 }
452 }
453
454 #[test]
455 fn test_config_from_env() {
456 let _lock = env_lock().lock().expect("env lock");
457 clean_env();
459
460 unsafe {
461 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
462 std::env::set_var("CODETETHER_S3_PREFIX", "audit/");
463 std::env::set_var(
464 "CODETETHER_S3_ENDPOINT",
465 "https://test.r2.cloudflarestorage.com",
466 );
467 }
468
469 let config = S3SinkConfig::from_env();
470 assert!(config.is_some());
471
472 let cfg = config.unwrap();
473 assert_eq!(cfg.bucket, "test-bucket");
474 assert_eq!(cfg.prefix, "audit/");
475 assert_eq!(
476 cfg.endpoint,
477 Some("https://test.r2.cloudflarestorage.com".to_string())
478 );
479
480 clean_env();
482 }
483
484 #[test]
485 fn test_config_defaults() {
486 let _lock = env_lock().lock().expect("env lock");
487 clean_env();
488
489 unsafe {
490 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
491 }
492
493 let config = S3SinkConfig::from_env().unwrap();
494 assert_eq!(config.region, "us-east-1");
495
496 clean_env();
497 }
498
499 #[test]
500 fn test_is_configured() {
501 let _lock = env_lock().lock().expect("env lock");
502 clean_env();
504
505 let is_not_configured = !S3Sink::is_configured();
506 assert!(
507 is_not_configured,
508 "S3 sink should not be configured by default"
509 );
510
511 unsafe {
512 std::env::set_var("CODETETHER_S3_BUCKET", "test-bucket");
513 }
514 assert!(
515 S3Sink::is_configured(),
516 "S3 sink should be configured when bucket is set"
517 );
518
519 clean_env();
520 }
521}