Skip to main content

rivet_cli/destination/
s3.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use opendal::Operator;
5use opendal::blocking;
6use opendal::services::S3;
7
8use crate::config::DestinationConfig;
9use crate::error::Result;
10
11pub struct S3Destination {
12    _runtime: Arc<tokio::runtime::Runtime>,
13    op: blocking::Operator,
14    prefix: String,
15}
16
17impl S3Destination {
18    pub fn new(config: &DestinationConfig) -> Result<Self> {
19        let bucket = config
20            .bucket
21            .as_deref()
22            .ok_or_else(|| anyhow::anyhow!("S3 destination requires 'bucket'"))?;
23
24        let mut builder = S3::default().bucket(bucket);
25
26        if let Some(region) = &config.region {
27            builder = builder.region(region);
28        }
29        if let Some(endpoint) = &config.endpoint {
30            builder = builder.endpoint(endpoint);
31        }
32
33        if let Some(profile) = &config.aws_profile {
34            // SAFETY: S3Destination is constructed before any parallel chunk workers start,
35            // so no concurrent reads of AWS_PROFILE can race with this write.
36            unsafe { std::env::set_var("AWS_PROFILE", profile) };
37            log::info!("S3: using AWS profile '{}'", profile);
38        }
39
40        if let Some(env_name) = &config.access_key_env {
41            let key = std::env::var(env_name)
42                .map_err(|_| anyhow::anyhow!("env var '{}' not set for S3 access key", env_name))?;
43            builder = builder.access_key_id(&key);
44        }
45        if let Some(env_name) = &config.secret_key_env {
46            let secret = std::env::var(env_name)
47                .map_err(|_| anyhow::anyhow!("env var '{}' not set for S3 secret key", env_name))?;
48            builder = builder.secret_access_key(&secret);
49        }
50
51        let runtime = Arc::new(
52            tokio::runtime::Builder::new_multi_thread()
53                .enable_all()
54                .build()
55                .map_err(|e| anyhow::anyhow!("failed to create tokio runtime for S3: {}", e))?,
56        );
57        let _guard = runtime.enter();
58
59        let async_op = Operator::new(builder)?.finish();
60        let op = blocking::Operator::new(async_op)?;
61
62        let prefix = config.prefix.clone().unwrap_or_default();
63
64        Ok(Self {
65            _runtime: runtime,
66            op,
67            prefix,
68        })
69    }
70}
71
72impl super::Destination for S3Destination {
73    fn write(&self, local_path: &Path, remote_key: &str) -> Result<()> {
74        let key = format!("{}{}", self.prefix, remote_key);
75        let mut src = std::fs::File::open(local_path)?;
76        let mut dst = self.op.writer(&key)?.into_std_write();
77        std::io::copy(&mut src, &mut dst)?;
78        dst.close()?;
79        log::info!("uploaded s3://{}", key);
80        Ok(())
81    }
82}