use chrono::{DateTime, Utc};
use opendal::Operator;
use opendal::services::Azblob;
use super::cloud::{CloudBackend, CloudDestination};
use crate::config::DestinationConfig;
use crate::error::Result;
const SAS_NEAR_EXPIRY_THRESHOLD: chrono::Duration = chrono::Duration::minutes(60);
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum SasExpiryStatus {
Healthy { expires_at: DateTime<Utc> },
NearExpiry {
expires_at: DateTime<Utc>,
remaining: chrono::Duration,
},
Expired { expires_at: DateTime<Utc> },
NoExpiry,
Unparseable { raw: String },
}
pub(crate) fn parse_sas_expiry_status(token: &str, now: DateTime<Utc>) -> SasExpiryStatus {
let body = token.trim_start_matches('?');
for pair in body.split('&') {
let mut kv = pair.splitn(2, '=');
let Some(key) = kv.next() else { continue };
if key != "se" {
continue;
}
let Some(raw) = kv.next() else {
return SasExpiryStatus::Unparseable { raw: String::new() };
};
let decoded = raw
.replace("%3A", ":")
.replace("%3a", ":")
.replace("%2B", "+")
.replace("%2b", "+");
match DateTime::parse_from_rfc3339(&decoded) {
Ok(dt) => {
let expires_at = dt.with_timezone(&Utc);
if expires_at <= now {
return SasExpiryStatus::Expired { expires_at };
}
let remaining = expires_at - now;
if remaining <= SAS_NEAR_EXPIRY_THRESHOLD {
return SasExpiryStatus::NearExpiry {
expires_at,
remaining,
};
}
return SasExpiryStatus::Healthy { expires_at };
}
Err(_) => {
return SasExpiryStatus::Unparseable {
raw: decoded.to_string(),
};
}
}
}
SasExpiryStatus::NoExpiry
}
fn enforce_sas_expiry(token: &str) -> Result<()> {
match parse_sas_expiry_status(token, Utc::now()) {
SasExpiryStatus::Healthy { .. } | SasExpiryStatus::NoExpiry => Ok(()),
SasExpiryStatus::NearExpiry {
expires_at,
remaining,
} => {
let mins = remaining.num_minutes().max(0);
log::warn!(
"Azure SAS token expires in {} minute{} ({}). Long exports may fail mid-run; rotate the token before extraction.",
mins,
if mins == 1 { "" } else { "s" },
expires_at.to_rfc3339()
);
Ok(())
}
SasExpiryStatus::Expired { expires_at } => {
anyhow::bail!(
"Azure SAS token already expired (se={}). Generate a new SAS and re-export.",
expires_at.to_rfc3339()
)
}
SasExpiryStatus::Unparseable { raw } => {
log::warn!(
"Azure SAS token has unparseable 'se=' value ({:?}); skipping expiry check. The token may still authenticate, but Rivet cannot warn on near-expiry.",
raw
);
Ok(())
}
}
}
pub type AzureDestination = CloudDestination<AzureBackend>;
pub struct AzureBackend;
fn read_credential_env(env_name: &str, label: &str) -> Result<zeroize::Zeroizing<String>> {
let value = std::env::var(env_name)
.map_err(|_| anyhow::anyhow!("env var '{}' not set for Azure {}", env_name, label))?;
Ok(zeroize::Zeroizing::new(value))
}
impl CloudBackend for AzureBackend {
const RUNTIME_LABEL: &'static str = "Azure";
const SCHEME: &'static str = "az";
fn build_operator(config: &DestinationConfig) -> Result<Operator> {
let container = config.bucket.as_deref().ok_or_else(|| {
anyhow::anyhow!("Azure destination requires 'bucket' (container name)")
})?;
let mut builder = Azblob::default().container(container);
match (&config.endpoint, &config.account_name) {
(Some(endpoint), _) => {
builder = builder.endpoint(endpoint);
}
(None, Some(account_name)) => {
let derived = format!("https://{account_name}.blob.core.windows.net");
builder = builder.endpoint(&derived);
}
(None, None) => {
}
}
if config.allow_anonymous {
log::info!("Azure: allow_anonymous (Azurite emulator or public container)");
} else {
let account_name = config.account_name.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"Azure destination requires 'account_name' (or 'allow_anonymous: true' for Azurite)"
)
})?;
builder = builder.account_name(account_name);
match (&config.account_key_env, &config.sas_token_env) {
(Some(_), Some(_)) => anyhow::bail!(
"Azure destination: 'account_key_env' and 'sas_token_env' are mutually exclusive — pick one auth mode"
),
(Some(env_name), None) => {
let key = read_credential_env(env_name, "account key")?;
builder = builder.account_key(key.as_str());
}
(None, Some(env_name)) => {
let raw = read_credential_env(env_name, "SAS token")?;
let token = raw.trim_start_matches('?');
enforce_sas_expiry(token)?;
builder = builder.sas_token(token);
}
(None, None) => anyhow::bail!(
"Azure destination requires 'account_key_env' or 'sas_token_env' (or 'allow_anonymous: true' for Azurite)"
),
}
}
Ok(Operator::new(builder)?.finish())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{DestinationConfig, DestinationType};
fn expect_err(cfg: &DestinationConfig) -> String {
match AzureDestination::new(cfg) {
Err(e) => format!("{e:#}"),
Ok(_) => panic!("expected error, got Ok"),
}
}
#[test]
fn azure_destination_requires_bucket() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
account_name: Some("acct".into()),
account_key_env: Some("AZURE_TEST_KEY_X".into()),
..Default::default()
};
unsafe { std::env::set_var("AZURE_TEST_KEY_X", "fake") };
let msg = expect_err(&cfg);
assert!(
msg.contains("bucket") || msg.contains("container"),
"missing bucket should be reported: {msg}"
);
unsafe { std::env::remove_var("AZURE_TEST_KEY_X") };
}
#[test]
fn azure_destination_requires_account_name_when_not_anonymous() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_key_env: Some("AZURE_TEST_KEY_Y".into()),
..Default::default()
};
unsafe { std::env::set_var("AZURE_TEST_KEY_Y", "fake") };
let msg = expect_err(&cfg);
assert!(
msg.contains("account_name"),
"missing account_name should be reported: {msg}"
);
unsafe { std::env::remove_var("AZURE_TEST_KEY_Y") };
}
#[test]
fn azure_destination_requires_account_key_env_when_not_anonymous() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("acct".into()),
..Default::default()
};
let msg = expect_err(&cfg);
assert!(
msg.contains("account_key_env"),
"missing account_key_env should be reported: {msg}"
);
}
#[test]
fn azure_destination_account_key_env_missing_var_errors() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("acct".into()),
account_key_env: Some("RIVET_TEST_AZURE_KEY_UNSET_Z".into()),
..Default::default()
};
unsafe { std::env::remove_var("RIVET_TEST_AZURE_KEY_UNSET_Z") };
let msg = expect_err(&cfg);
assert!(
msg.contains("RIVET_TEST_AZURE_KEY_UNSET_Z"),
"missing env var name in error: {msg}"
);
assert!(
msg.contains("account key"),
"missing credential label in error: {msg}"
);
}
#[test]
fn azure_destination_allow_anonymous_skips_credential_requirements() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
endpoint: Some("http://127.0.0.1:10000/devstoreaccount1".into()),
allow_anonymous: true,
..Default::default()
};
let err = AzureDestination::new(&cfg).err().map(|e| format!("{e:#}"));
if let Some(msg) = err {
assert!(
!msg.contains("account_name") && !msg.contains("account_key_env"),
"allow_anonymous must bypass credential checks, got: {msg}"
);
}
}
#[test]
fn azure_destination_auto_derives_endpoint_from_account_name() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("mystorageacct".into()),
account_key_env: Some("RIVET_TEST_AZURE_ENDPOINT_FAKE".into()),
..Default::default()
};
unsafe { std::env::set_var("RIVET_TEST_AZURE_ENDPOINT_FAKE", "fake-key-value") };
let result = AzureDestination::new(&cfg);
if let Err(e) = result {
let msg = format!("{e:#}").to_lowercase();
assert!(
!msg.contains("endpoint is empty"),
"auto-derive must populate endpoint from account_name; got: {msg}"
);
}
unsafe { std::env::remove_var("RIVET_TEST_AZURE_ENDPOINT_FAKE") };
}
#[test]
fn azure_destination_explicit_endpoint_wins_over_derived() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
endpoint: Some("http://127.0.0.1:10000/devstoreaccount1".into()),
account_name: Some("devstoreaccount1".into()),
account_key_env: Some("RIVET_TEST_AZURE_EXPLICIT_EP_FAKE".into()),
..Default::default()
};
unsafe { std::env::set_var("RIVET_TEST_AZURE_EXPLICIT_EP_FAKE", "fake") };
let result = AzureDestination::new(&cfg);
if let Err(e) = result {
let msg = format!("{e:#}").to_lowercase();
assert!(
!msg.contains("endpoint is empty"),
"explicit endpoint must reach builder; got: {msg}"
);
}
unsafe { std::env::remove_var("RIVET_TEST_AZURE_EXPLICIT_EP_FAKE") };
}
#[test]
fn azure_destination_config_parses_from_yaml() {
let yaml = r#"
type: azure
bucket: my-container
account_name: mystorageacct
account_key_env: AZURE_STORAGE_KEY
"#;
let cfg: DestinationConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(cfg.destination_type, DestinationType::Azure);
assert_eq!(cfg.bucket.as_deref(), Some("my-container"));
assert_eq!(cfg.account_name.as_deref(), Some("mystorageacct"));
assert_eq!(cfg.account_key_env.as_deref(), Some("AZURE_STORAGE_KEY"));
}
#[test]
fn azure_destination_sas_token_env_satisfies_auth() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("mystorageacct".into()),
sas_token_env: Some("RIVET_TEST_AZURE_SAS_OK".into()),
..Default::default()
};
unsafe { std::env::set_var("RIVET_TEST_AZURE_SAS_OK", "sv=2021-08-06&sig=fake") };
let result = AzureDestination::new(&cfg);
if let Err(e) = result {
let msg = format!("{e:#}");
assert!(
!msg.contains("account_key_env"),
"SAS-token auth must bypass account_key_env requirement; got: {msg}"
);
}
unsafe { std::env::remove_var("RIVET_TEST_AZURE_SAS_OK") };
}
#[test]
fn azure_destination_rejects_account_key_and_sas_token_together() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("acct".into()),
account_key_env: Some("RIVET_TEST_AZURE_BOTH_KEY".into()),
sas_token_env: Some("RIVET_TEST_AZURE_BOTH_SAS".into()),
..Default::default()
};
unsafe { std::env::set_var("RIVET_TEST_AZURE_BOTH_KEY", "fake") };
unsafe { std::env::set_var("RIVET_TEST_AZURE_BOTH_SAS", "fake") };
let msg = expect_err(&cfg);
assert!(
msg.contains("account_key_env") && msg.contains("sas_token_env"),
"error must name both fields: {msg}"
);
assert!(
msg.contains("mutually exclusive"),
"error must say mutually exclusive: {msg}"
);
unsafe { std::env::remove_var("RIVET_TEST_AZURE_BOTH_KEY") };
unsafe { std::env::remove_var("RIVET_TEST_AZURE_BOTH_SAS") };
}
#[test]
fn azure_destination_missing_both_account_key_and_sas_token_errors() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("acct".into()),
..Default::default()
};
let msg = expect_err(&cfg);
assert!(
msg.contains("account_key_env") && msg.contains("sas_token_env"),
"error must name both auth modes: {msg}"
);
}
#[test]
fn azure_destination_sas_token_env_missing_var_errors_with_label() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("acct".into()),
sas_token_env: Some("RIVET_TEST_AZURE_SAS_UNSET".into()),
..Default::default()
};
unsafe { std::env::remove_var("RIVET_TEST_AZURE_SAS_UNSET") };
let msg = expect_err(&cfg);
assert!(
msg.contains("RIVET_TEST_AZURE_SAS_UNSET"),
"missing env var name in error: {msg}"
);
assert!(
msg.contains("SAS token"),
"credential label must mention SAS token: {msg}"
);
}
#[test]
fn azure_destination_sas_token_config_parses_from_yaml() {
let yaml = r#"
type: azure
bucket: my-container
account_name: mystorageacct
sas_token_env: AZURE_STORAGE_SAS_TOKEN
"#;
let cfg: DestinationConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(cfg.destination_type, DestinationType::Azure);
assert_eq!(cfg.bucket.as_deref(), Some("my-container"));
assert_eq!(cfg.account_name.as_deref(), Some("mystorageacct"));
assert_eq!(
cfg.sas_token_env.as_deref(),
Some("AZURE_STORAGE_SAS_TOKEN")
);
assert!(cfg.account_key_env.is_none());
}
use chrono::TimeZone;
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
#[test]
fn sas_expiry_parses_far_future_as_healthy() {
let token = "sv=2021-08-06&se=2099-01-01T00:00:00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
match parse_sas_expiry_status(token, now) {
SasExpiryStatus::Healthy { expires_at } => {
assert_eq!(expires_at, ts("2099-01-01T00:00:00Z"));
}
other => panic!("expected Healthy, got {other:?}"),
}
}
#[test]
fn sas_expiry_parses_within_threshold_as_near_expiry() {
let token = "sv=2021-08-06&se=2026-05-23T12:30:00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
match parse_sas_expiry_status(token, now) {
SasExpiryStatus::NearExpiry {
expires_at,
remaining,
} => {
assert_eq!(expires_at, ts("2026-05-23T12:30:00Z"));
assert_eq!(remaining.num_minutes(), 30);
}
other => panic!("expected NearExpiry, got {other:?}"),
}
}
#[test]
fn sas_expiry_parses_past_as_expired() {
let token = "sv=2021-08-06&se=2024-01-01T00:00:00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
match parse_sas_expiry_status(token, now) {
SasExpiryStatus::Expired { expires_at } => {
assert_eq!(expires_at, ts("2024-01-01T00:00:00Z"));
}
other => panic!("expected Expired, got {other:?}"),
}
}
#[test]
fn sas_expiry_treats_now_equals_se_as_expired() {
let token = "sv=2021-08-06&se=2026-05-23T12:00:00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
match parse_sas_expiry_status(token, now) {
SasExpiryStatus::Expired { .. } => {}
other => panic!("expected Expired, got {other:?}"),
}
}
#[test]
fn sas_expiry_handles_url_encoded_colons() {
let token = "sv=2021-08-06&se=2099-01-01T00%3A00%3A00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
assert!(matches!(
parse_sas_expiry_status(token, now),
SasExpiryStatus::Healthy { .. }
));
}
#[test]
fn sas_expiry_handles_url_encoded_plus_in_timezone_offset() {
let token = "sv=2021-08-06&se=2024-01-01T00%3A00%3A00%2B00%3A00&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
assert!(matches!(
parse_sas_expiry_status(token, now),
SasExpiryStatus::Expired { .. }
));
}
#[test]
fn sas_expiry_returns_no_expiry_when_se_missing() {
let token = "sv=2021-08-06&si=mypolicy&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
assert_eq!(
parse_sas_expiry_status(token, now),
SasExpiryStatus::NoExpiry
);
}
#[test]
fn sas_expiry_returns_unparseable_for_bad_format() {
let token = "sv=2021-08-06&se=NOT_A_DATE&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
match parse_sas_expiry_status(token, now) {
SasExpiryStatus::Unparseable { raw } => assert_eq!(raw, "NOT_A_DATE"),
other => panic!("expected Unparseable, got {other:?}"),
}
}
#[test]
fn sas_expiry_tolerates_leading_question_mark() {
let token = "?sv=2021-08-06&se=2099-01-01T00:00:00Z&sig=fake";
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
assert!(matches!(
parse_sas_expiry_status(token, now),
SasExpiryStatus::Healthy { .. }
));
}
#[test]
fn sas_expiry_at_threshold_is_near_expiry() {
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
let exactly_at_threshold = now + SAS_NEAR_EXPIRY_THRESHOLD;
let token = format!(
"sv=2021-08-06&se={}&sig=fake",
exactly_at_threshold.to_rfc3339()
);
assert!(matches!(
parse_sas_expiry_status(&token, now),
SasExpiryStatus::NearExpiry { .. }
));
}
#[test]
fn sas_expiry_one_second_above_threshold_is_healthy() {
let now = Utc.with_ymd_and_hms(2026, 5, 23, 12, 0, 0).unwrap();
let above_threshold = now + SAS_NEAR_EXPIRY_THRESHOLD + chrono::Duration::seconds(1);
let token = format!("sv=2021-08-06&se={}&sig=fake", above_threshold.to_rfc3339());
assert!(matches!(
parse_sas_expiry_status(&token, now),
SasExpiryStatus::Healthy { .. }
));
}
#[test]
fn azure_destination_rejects_expired_sas_token() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("mystorageacct".into()),
sas_token_env: Some("RIVET_TEST_AZURE_SAS_EXPIRED".into()),
..Default::default()
};
unsafe {
std::env::set_var(
"RIVET_TEST_AZURE_SAS_EXPIRED",
"sv=2021-08-06&se=2024-01-01T00:00:00Z&sig=fake",
)
};
let msg = expect_err(&cfg);
assert!(
msg.contains("expired") || msg.contains("Expired"),
"expired SAS must surface as expiry error: {msg}"
);
unsafe { std::env::remove_var("RIVET_TEST_AZURE_SAS_EXPIRED") };
}
#[test]
fn azure_destination_accepts_far_future_sas_token() {
let cfg = DestinationConfig {
destination_type: DestinationType::Azure,
bucket: Some("data".into()),
account_name: Some("mystorageacct".into()),
sas_token_env: Some("RIVET_TEST_AZURE_SAS_HEALTHY".into()),
..Default::default()
};
unsafe {
std::env::set_var(
"RIVET_TEST_AZURE_SAS_HEALTHY",
"sv=2021-08-06&se=2099-01-01T00:00:00Z&sig=fake",
)
};
if let Err(e) = AzureDestination::new(&cfg) {
let msg = format!("{e:#}");
assert!(
!msg.contains("expired") && !msg.contains("Expired"),
"healthy SAS must pass expiry preflight; got: {msg}"
);
}
unsafe { std::env::remove_var("RIVET_TEST_AZURE_SAS_HEALTHY") };
}
}