1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
use std::path::Path;
use std::sync::Arc;
use opendal::Operator;
use opendal::blocking;
use opendal::layers::RetryLayer;
use opendal::services::S3;
use crate::config::DestinationConfig;
use crate::error::Result;
pub struct S3Destination {
_runtime: Arc<tokio::runtime::Runtime>,
op: blocking::Operator,
prefix: String,
}
/// Read a credential from an env var into a `Zeroizing<String>`.
///
/// SecOps: the underlying heap buffer is zeroed on drop instead of lingering
/// in freed memory (visible via core dump, ptrace, or heap reuse). OpenDAL
/// stores its own copy inside `reqsign`; the `Zeroizing` wrapper only protects
/// our transient handle.
fn read_credential_env(env_name: &str, label: &str) -> Result<zeroize::Zeroizing<String>> {
let value = std::env::var(env_name)
.map_err(|_| anyhow::anyhow!("env var '{}' not set for S3 {}", env_name, label))?;
Ok(zeroize::Zeroizing::new(value))
}
impl S3Destination {
pub fn new(config: &DestinationConfig) -> Result<Self> {
let bucket = config
.bucket
.as_deref()
.ok_or_else(|| anyhow::anyhow!("S3 destination requires 'bucket'"))?;
let mut builder = S3::default().bucket(bucket);
if let Some(region) = &config.region {
builder = builder.region(region);
}
if let Some(endpoint) = &config.endpoint {
builder = builder.endpoint(endpoint);
}
if let Some(env_name) = &config.access_key_env {
let key = read_credential_env(env_name, "access key")?;
builder = builder.access_key_id(key.as_str());
}
if let Some(env_name) = &config.secret_key_env {
let secret = read_credential_env(env_name, "secret key")?;
builder = builder.secret_access_key(secret.as_str());
}
// STS session token: required whenever `access_key_id` starts with
// `ASIA…` rather than `AKIA…`. See `docs/cloud-auth.md`.
if let Some(env_name) = &config.session_token_env {
let token = read_credential_env(env_name, "session token")?;
builder = builder.session_token(token.as_str());
}
// `aws_profile` uses reqsign's per-instance loader (no `env::set_var`,
// so it's safe under `--parallel-exports`). Caveat: the default chain
// falls through to IMDS, which hangs off-EC2 — see `docs/cloud-auth.md`
// for the AWS SSO / Identity Center bridge.
if let Some(profile) = &config.aws_profile {
log::info!("S3: using AWS profile '{}'", profile);
let cred_config = reqsign::AwsConfig {
profile: profile.clone(),
..Default::default()
}
.from_profile()
.from_env();
let loader = reqsign::AwsDefaultLoader::new(reqwest::Client::new(), cred_config);
builder = builder.customized_credential_load(Box::new(loader));
}
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|e| anyhow::anyhow!("failed to create tokio runtime for S3: {}", e))?,
);
let _guard = runtime.enter();
// See gcs.rs for the rationale; same `RetryLayer` is applied to S3
// so transient hyper / SDK errors are absorbed inside the operator
// instead of escalating to a chunk-level re-fetch.
let op = blocking::Operator::new(
Operator::new(builder)?
.layer(
RetryLayer::new()
.with_max_times(5)
.with_min_delay(std::time::Duration::from_millis(200))
.with_max_delay(std::time::Duration::from_secs(10))
.with_jitter(),
)
.finish(),
)?;
let prefix = config.prefix.clone().unwrap_or_default();
Ok(Self {
_runtime: runtime,
op,
prefix,
})
}
}
impl super::Destination for S3Destination {
fn write(&self, local_path: &Path, remote_key: &str) -> Result<()> {
let key = format!("{}{}", self.prefix, remote_key);
let mut src = std::fs::File::open(local_path)?;
let mut dst = self.op.writer(&key)?.into_std_write();
std::io::copy(&mut src, &mut dst)?;
dst.close()?;
log::info!("uploaded s3://{}", key);
Ok(())
}
fn capabilities(&self) -> super::DestinationCapabilities {
super::DestinationCapabilities {
commit_protocol: super::WriteCommitProtocol::FinalizeOnClose,
idempotent_overwrite: true,
retry_safe: true,
partial_write_risk: false,
}
}
// ── ADR-0013 read surface (delegates to opendal) ─────────────────────
//
// The `prefix` arg is configured-prefix-relative; we apply the same
// `self.prefix` join the writer applies so callers see a consistent
// namespace. Returned `key` values are *also* configured-prefix-
// relative — symmetric with `write`'s `remote_key` argument.
fn list_prefix(&self, prefix: &str) -> Result<Vec<super::ObjectMeta>> {
let full = format!("{}{}", self.prefix, prefix);
// opendal expects a trailing `/` for directory listings. For a
// bucket root the empty string is fine; for any non-empty prefix
// we add `/` if the caller didn't.
let listed = if full.is_empty() || full.ends_with('/') {
self.op.list_options(
&full,
opendal::options::ListOptions {
recursive: true,
..Default::default()
},
)?
} else {
self.op.list_options(
&format!("{}/", full),
opendal::options::ListOptions {
recursive: true,
..Default::default()
},
)?
};
let mut out = Vec::with_capacity(listed.len());
for entry in listed {
if entry.metadata().mode() != opendal::EntryMode::FILE {
continue;
}
// entry.path() returns a bucket-root-absolute key; strip our
// configured prefix so the returned `key` is comparable to
// values the caller passed to `write`.
let abs = entry.path().to_string();
let rel = abs
.strip_prefix(self.prefix.as_str())
.unwrap_or(abs.as_str())
.to_string();
out.push(super::ObjectMeta {
key: rel,
size_bytes: entry.metadata().content_length(),
});
}
Ok(out)
}
fn read(&self, key: &str) -> Result<Vec<u8>> {
let full = format!("{}{}", self.prefix, key);
let buf = self.op.read(&full)?;
Ok(buf.to_vec())
}
fn head(&self, key: &str) -> Result<Option<super::ObjectMeta>> {
let full = format!("{}{}", self.prefix, key);
// `stat` returns NotFound for absent keys; opendal exposes the
// discriminator on the returned error so we can keep our
// contract "Ok(None) is unambiguous absence".
match self.op.stat(&full) {
Ok(meta) => Ok(Some(super::ObjectMeta {
key: key.to_string(),
size_bytes: meta.content_length(),
})),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
fn r#move(&self, from: &str, to: &str) -> Result<()> {
// S3 is not POSIX — no native rename. Mirrors the GCS path:
// explicit copy + delete instead of `op.rename` (which opendal
// 0.55 returns Unsupported for both S3 and GCS). ADR-0012 M9
// best-effort: a partial copy-ok / delete-fail leaves the
// source reachable at both paths and re-trips M9 next resume.
let from_full = format!("{}{}", self.prefix, from);
let to_full = format!("{}{}", self.prefix, to);
self.op.copy(&from_full, &to_full)?;
self.op.delete(&from_full)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
// ── aws_profile thread-safety ─────────────────────────────────────────────
//
// The key invariant: when `aws_profile` is set in DestinationConfig, the
// code must NOT call `std::env::set_var("AWS_PROFILE", ...)` — that would
// be a data race under --parallel-exports. Instead it uses a per-export
// `reqsign::AwsConfig { profile, .. }` instance.
//
// These tests verify the no-env-mutation contract by exercising the exact
// same code path S3Destination::new() uses for the aws_profile branch,
// without requiring S3 credentials or network access.
#[test]
fn aws_profile_does_not_mutate_aws_profile_env_var() {
let before = std::env::var("AWS_PROFILE").ok();
// Mirror S3Destination::new(): build AwsConfig with a profile name.
// Critically: no env::set_var call anywhere in this path.
let profile = "unit-test-profile-rivet";
let cred_config = reqsign::AwsConfig {
profile: profile.to_string(),
..Default::default()
}
.from_profile()
.from_env();
// Drop without making network calls — we're only testing env isolation.
drop(cred_config);
let after = std::env::var("AWS_PROFILE").ok();
assert_eq!(
before, after,
"building AwsConfig with a named profile must not mutate the AWS_PROFILE env var"
);
}
#[test]
fn aws_profile_independent_configs_are_independent() {
// Two AwsConfig instances with different profiles must be independent
// (no shared global state). This verifies that parallel exports each
// get their own credential loader, not a shared one.
let cfg_a = reqsign::AwsConfig {
profile: "profile-a".to_string(),
..Default::default()
};
let cfg_b = reqsign::AwsConfig {
profile: "profile-b".to_string(),
..Default::default()
};
// The profile field should reflect what was set — no cross-contamination.
assert_eq!(cfg_a.profile, "profile-a");
assert_eq!(cfg_b.profile, "profile-b");
}
#[test]
fn aws_profile_config_field_parsed_from_destination_config() {
use crate::config::DestinationConfig;
let yaml = r#"
type: s3
bucket: my-bucket
aws_profile: staging
"#;
let config: DestinationConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(config.aws_profile.as_deref(), Some("staging"));
}
// ── session_token_env (STS / SSO / AssumeRole credentials) ────────────────
#[test]
fn session_token_env_field_parsed_from_destination_config() {
use crate::config::DestinationConfig;
let yaml = r#"
type: s3
bucket: my-bucket
access_key_env: AWS_ACCESS_KEY_ID
secret_key_env: AWS_SECRET_ACCESS_KEY
session_token_env: AWS_SESSION_TOKEN
"#;
let config: DestinationConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(
config.session_token_env.as_deref(),
Some("AWS_SESSION_TOKEN")
);
}
#[test]
fn read_credential_env_missing_var_errors_with_label() {
// Reach a unique env-var name that is guaranteed unset across runners.
let name = "RIVET_TEST_S3_TOKEN_DEFINITELY_UNSET_XYZ";
// SAFETY: test-only; binary is single-threaded in this test context.
unsafe { std::env::remove_var(name) };
let err = super::read_credential_env(name, "session token").unwrap_err();
let msg = format!("{err:#}");
// The error must surface both the env-var name (operator can grep)
// and the credential label (operator knows which slot is empty).
assert!(msg.contains(name), "missing env var name in error: {msg}");
assert!(
msg.contains("session token"),
"missing credential label in error: {msg}"
);
}
#[test]
fn read_credential_env_reads_value_into_zeroizing() {
let name = "RIVET_TEST_S3_TOKEN_PRESENT_XYZ";
// SAFETY: test-only; binary is single-threaded in this test context.
unsafe { std::env::set_var(name, "fake-token-value") };
let zeroizing = super::read_credential_env(name, "session token").unwrap();
assert_eq!(zeroizing.as_str(), "fake-token-value");
unsafe { std::env::remove_var(name) };
}
}