Skip to main content

rivet_cli/destination/
gcs.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use opendal::Operator;
5use opendal::blocking;
6use opendal::services::Gcs;
7
8use super::gcs_auth;
9use crate::config::DestinationConfig;
10use crate::error::Result;
11
12pub struct GcsDestination {
13    _runtime: Arc<tokio::runtime::Runtime>,
14    op: blocking::Operator,
15    prefix: String,
16}
17
18impl GcsDestination {
19    pub fn new(config: &DestinationConfig) -> Result<Self> {
20        let bucket = config
21            .bucket
22            .as_deref()
23            .ok_or_else(|| anyhow::anyhow!("GCS destination requires 'bucket'"))?;
24
25        let mut builder = Gcs::default().bucket(bucket);
26
27        if let Some(endpoint) = &config.endpoint {
28            builder = builder.endpoint(endpoint);
29        }
30
31        if config.allow_anonymous {
32            builder = builder
33                .allow_anonymous()
34                .disable_vm_metadata()
35                .disable_config_load();
36            log::info!("GCS: allow_anonymous (emulator mode; no OAuth / service account)");
37        } else if let Some(cred_file) = &config.credentials_file {
38            builder = builder.credential_path(cred_file);
39            log::info!("GCS: using credentials_file from config: {}", cred_file);
40        } else if let Some(token) = gcs_auth::try_authorized_user_token()? {
41            builder = builder.disable_vm_metadata().token(token);
42            log::info!("GCS: using access token from ADC authorized_user credentials");
43        } else {
44            log::info!(
45                "GCS: using Google default credential chain \
46                 (service account JSON via GOOGLE_APPLICATION_CREDENTIALS, then VM metadata)"
47            );
48        }
49
50        let runtime = Arc::new(
51            tokio::runtime::Builder::new_multi_thread()
52                .enable_all()
53                .build()
54                .map_err(|e| anyhow::anyhow!("failed to create tokio runtime for GCS: {}", e))?,
55        );
56        let _guard = runtime.enter();
57
58        let async_op = Operator::new(builder)?.finish();
59        let op = blocking::Operator::new(async_op)?;
60
61        let prefix = config.prefix.clone().unwrap_or_default();
62
63        Ok(Self {
64            _runtime: runtime,
65            op,
66            prefix,
67        })
68    }
69}
70
71impl super::Destination for GcsDestination {
72    fn write(&self, local_path: &Path, remote_key: &str) -> Result<()> {
73        let key = format!("{}{}", self.prefix, remote_key);
74        let mut src = std::fs::File::open(local_path)?;
75        let mut dst = self.op.writer(&key)?.into_std_write();
76        std::io::copy(&mut src, &mut dst)?;
77        dst.close()?;
78        log::info!("uploaded gs://{}", key);
79        Ok(())
80    }
81}