use std::collections::BTreeMap;
use std::time::Instant;
use anyhow::{anyhow, bail, Context, Result};
use secretenv_core::{
resolve_registry, AliasMap, Backend, BackendRegistry, BackendUri, Config, RegistryCache,
RegistrySelection, Secret,
};
use secretenv_telemetry::span::{MigrateOutcome, MigratePhase, SecretEnvSpan};
#[derive(Debug, Clone)]
pub struct MigrateArgs {
pub alias: String,
pub dest_uri: String,
pub source_uri: Option<String>,
pub registry: Option<String>,
pub dry_run: bool,
pub delete_source: bool,
}
#[derive(Debug, Clone)]
pub struct MigrationPlan {
pub alias: String,
pub source_uri: BackendUri,
pub dest_uri: BackendUri,
pub registry_source_uri: BackendUri,
pub transaction_id: String,
}
#[derive(Debug, Default, Clone, Copy)]
#[allow(clippy::struct_field_names)]
pub struct PhaseDurations {
pub probe_ms: u64,
pub read_ms: u64,
pub write_ms: u64,
pub pointer_flip_ms: u64,
pub source_delete_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MigrateReportOutcome {
Success,
#[allow(dead_code)] #[doc(hidden)]
PartialFailurePointerFlip,
SourceDeleteFailedPostCommit,
DryRun,
}
impl MigrateReportOutcome {
const fn as_telemetry(self) -> MigrateOutcome {
match self {
Self::Success => MigrateOutcome::Ok,
Self::SourceDeleteFailedPostCommit => MigrateOutcome::OkWithCleanupFailure,
Self::PartialFailurePointerFlip => MigrateOutcome::PartialFailure,
Self::DryRun => MigrateOutcome::DryRun,
}
}
}
#[derive(Debug, Clone)]
pub struct MigrateReport {
pub alias: String,
pub source_backend_type: String,
pub dest_backend_type: String,
pub outcome: MigrateReportOutcome,
pub phase_durations: PhaseDurations,
pub delete_source: bool,
pub delete_hint: Option<String>,
pub transaction_id: String,
pub probe_results: Vec<(String, String)>,
}
pub async fn build_migration_plan(
args: &MigrateArgs,
config: &Config,
backends: &BackendRegistry,
) -> Result<MigrationPlan> {
let dest_uri = BackendUri::parse(&args.dest_uri)
.with_context(|| format!("destination '{}' is not a valid URI", args.dest_uri))?;
if dest_uri.is_alias() {
bail!("destination must be a direct backend URI, not a secretenv:// alias");
}
if backends.get(&dest_uri.scheme).is_none() {
bail!(
"destination '{}' references backend instance '{}' which is not configured",
args.dest_uri,
dest_uri.scheme
);
}
let selection = registry_selection(args.registry.as_deref(), config)?;
let mut cache = RegistryCache::new();
let aliases = resolve_registry(config, &selection, backends, &mut cache).await?;
let source_uri = if let Some(explicit) = &args.source_uri {
BackendUri::parse(explicit)
.with_context(|| format!("--from '{explicit}' is not a valid URI"))?
} else {
let (target, _src) = aliases.get(&args.alias).ok_or_else(|| {
anyhow!(
"alias '{}' not found in registry cascade [{}]",
args.alias,
format_sources(&aliases)
)
})?;
target.clone()
};
if source_uri.is_alias() {
bail!(
"alias '{}' resolved to another alias ('{}') — migrate operates on backend URIs only",
args.alias,
source_uri.raw
);
}
if backends.get(&source_uri.scheme).is_none() {
bail!(
"source '{}' references backend instance '{}' which is not configured",
source_uri.raw,
source_uri.scheme
);
}
let registry_source_uri = aliases.primary_source().clone();
Ok(MigrationPlan {
alias: args.alias.clone(),
source_uri,
dest_uri,
registry_source_uri,
transaction_id: new_transaction_id(),
})
}
#[derive(Debug)]
pub struct PointerFlipFailed {
pub alias: String,
pub dest_uri_raw: String,
pub dest_delete_hint: String,
}
impl std::fmt::Display for PointerFlipFailed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"pointer flip failed for alias '{}' after destination write succeeded; \
value exists in both backends — operator action required (see stderr).",
self.alias
)
}
}
impl std::error::Error for PointerFlipFailed {}
#[allow(dead_code)]
pub async fn migrate<F>(
args: MigrateArgs,
config: &Config,
backends: &BackendRegistry,
post_commit_source_delete_consent: F,
) -> Result<MigrateReport>
where
F: FnOnce(&MigrationPlan) -> bool,
{
let plan = build_migration_plan(&args, config, backends).await?;
migrate_with_plan(plan, &args, backends, post_commit_source_delete_consent).await
}
pub async fn migrate_with_plan<F>(
plan: MigrationPlan,
args: &MigrateArgs,
backends: &BackendRegistry,
post_commit_source_delete_consent: F,
) -> Result<MigrateReport>
where
F: FnOnce(&MigrationPlan) -> bool,
{
let (mut span, _guard) = SecretEnvSpan::start("secretenv.migrate");
span.record_command("migrate")
.record_alias_name(&plan.alias)
.record_migrate_transaction_id(&plan.transaction_id);
let source = backend_for(backends, &plan.source_uri)?;
let dest = backend_for(backends, &plan.dest_uri)?;
span.record_migrate_source_backend_type(source.backend_type())
.record_migrate_dest_backend_type(dest.backend_type())
.record_migrate_delete_source(args.delete_source);
let (probe_ms, probe_results) = probe_phase(&plan, source, dest).await?;
if args.dry_run {
span.record_migrate_outcome(MigrateOutcome::DryRun);
return Ok(MigrateReport {
alias: plan.alias.clone(),
source_backend_type: source.backend_type().to_owned(),
dest_backend_type: dest.backend_type().to_owned(),
outcome: MigrateReportOutcome::DryRun,
phase_durations: PhaseDurations { probe_ms, ..PhaseDurations::default() },
delete_source: args.delete_source,
delete_hint: Some(source.delete_hint(&plan.source_uri)),
transaction_id: plan.transaction_id,
probe_results,
});
}
let (value, read_ms) = match migrate_read(&plan, source).await {
Ok(v) => v,
Err(e) => {
span.record_migrate_outcome(MigrateOutcome::SourceReadFailed);
return Err(e);
}
};
let write_ms = match migrate_write(&plan, dest, &value).await {
Ok(ms) => ms,
Err(e) => {
span.record_migrate_outcome(MigrateOutcome::DestWriteFailed);
return Err(e);
}
};
drop(value);
let flip_start = Instant::now();
let flip_result = migrate_registry_flip(&plan, backends).await;
let flip_ms = u64::try_from(flip_start.elapsed().as_millis()).unwrap_or(u64::MAX);
if let Err(flip_err) = flip_result {
span.record_migrate_phase(MigratePhase::PointerFlip)
.record_migrate_outcome(MigrateOutcome::PartialFailure);
return Err(flip_err.context(PointerFlipFailed {
alias: plan.alias.clone(),
dest_uri_raw: plan.dest_uri.raw.clone(),
dest_delete_hint: dest.delete_hint(&plan.dest_uri),
}));
}
let mut source_delete_ms = None;
let mut outcome = MigrateReportOutcome::Success;
let mut delete_hint = Some(source.delete_hint(&plan.source_uri));
if args.delete_source && post_commit_source_delete_consent(&plan) {
match migrate_source_delete(&plan, source).await {
Ok(ms) => {
source_delete_ms = Some(ms);
delete_hint = None;
}
Err(_e) => {
outcome = MigrateReportOutcome::SourceDeleteFailedPostCommit;
}
}
}
span.record_migrate_outcome(outcome.as_telemetry());
Ok(MigrateReport {
alias: plan.alias.clone(),
source_backend_type: source.backend_type().to_owned(),
dest_backend_type: dest.backend_type().to_owned(),
outcome,
phase_durations: PhaseDurations {
probe_ms,
read_ms,
write_ms,
pointer_flip_ms: flip_ms,
source_delete_ms,
},
delete_source: args.delete_source,
delete_hint,
transaction_id: plan.transaction_id,
probe_results,
})
}
async fn probe_phase(
plan: &MigrationPlan,
source: &dyn Backend,
dest: &dyn Backend,
) -> Result<(u64, Vec<(String, String)>)> {
let span = tracing::info_span!("secretenv.migrate.probe", alias = %plan.alias);
let _enter = span.enter();
let start = Instant::now();
let mut results = Vec::with_capacity(2);
let source_status = source.check().await;
let source_label = match source_status {
secretenv_core::BackendStatus::Ok { .. } => "ok".to_owned(),
secretenv_core::BackendStatus::NotAuthenticated { .. } => {
"error: not authenticated".to_owned()
}
secretenv_core::BackendStatus::CliMissing { .. } => "error: cli missing".to_owned(),
secretenv_core::BackendStatus::Error { .. } => "error: backend reported error".to_owned(),
};
results.push((source.instance_name().to_owned(), source_label));
match dest.probe_write(&plan.dest_uri).await {
Ok(()) => {
let dest_label = if dest.has_probe_write() {
"ok (probed)".to_owned()
} else {
"ok (no probe available for this backend)".to_owned()
};
results.push((dest.instance_name().to_owned(), dest_label));
}
Err(e) => {
results.push((dest.instance_name().to_owned(), format!("error: {e}")));
return Err(e.context(format!(
"destination probe rejected migrate {alias}: {dest_instance} cannot write at {dest_uri}",
alias = plan.alias,
dest_instance = dest.instance_name(),
dest_uri = plan.dest_uri.raw,
)));
}
}
let dur = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Ok((dur, results))
}
async fn migrate_read(plan: &MigrationPlan, source: &dyn Backend) -> Result<(Secret<String>, u64)> {
let span = tracing::info_span!("secretenv.migrate.read", alias = %plan.alias);
let _enter = span.enter();
let start = Instant::now();
let value = source
.get(&plan.source_uri)
.await
.with_context(|| format!("reading source value for alias '{}'", plan.alias))?;
let dur = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Ok((value, dur))
}
async fn migrate_write(
plan: &MigrationPlan,
dest: &dyn Backend,
value: &Secret<String>,
) -> Result<u64> {
let span = tracing::info_span!("secretenv.migrate.write", alias = %plan.alias);
let _enter = span.enter();
let start = Instant::now();
dest.write_secret(&plan.dest_uri, value)
.await
.with_context(|| format!("writing destination value for alias '{}'", plan.alias))?;
let dur = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Ok(dur)
}
async fn migrate_registry_flip(plan: &MigrationPlan, backends: &BackendRegistry) -> Result<()> {
let span = tracing::info_span!("secretenv.migrate.pointer_flip", alias = %plan.alias);
let _enter = span.enter();
let backend = backend_for(backends, &plan.registry_source_uri)?;
let current = backend.list(&plan.registry_source_uri).await.with_context(|| {
format!("reading registry document at '{}'", plan.registry_source_uri.raw)
})?;
let mut map: BTreeMap<String, String> = current.into_iter().collect();
map.insert(plan.alias.clone(), plan.dest_uri.raw.clone());
let serialized = secretenv_core::serialize_registry_doc(backend.registry_format(), &map)?;
backend.set(&plan.registry_source_uri, &serialized).await.with_context(|| {
format!("writing updated registry document to '{}'", plan.registry_source_uri.raw)
})?;
Ok(())
}
async fn migrate_source_delete(plan: &MigrationPlan, source: &dyn Backend) -> Result<u64> {
let span = tracing::info_span!("secretenv.migrate.source_delete", alias = %plan.alias);
let _enter = span.enter();
let start = Instant::now();
source
.delete_secret(&plan.source_uri)
.await
.with_context(|| format!("deleting source value for alias '{}'", plan.alias))?;
let dur = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Ok(dur)
}
fn backend_for<'a>(backends: &'a BackendRegistry, uri: &BackendUri) -> Result<&'a dyn Backend> {
backends
.get(&uri.scheme)
.ok_or_else(|| anyhow!("no backend instance '{}' is configured", uri.scheme))
}
fn registry_selection(registry: Option<&str>, config: &Config) -> Result<RegistrySelection> {
if let Some(value) = registry {
if value.starts_with("secretenv://") || value.contains("://") {
return BackendUri::parse(value)
.map(RegistrySelection::Uri)
.with_context(|| format!("--registry '{value}' is not a valid URI"));
}
return Ok(RegistrySelection::Name(value.to_owned()));
}
if let Ok(env) = std::env::var("SECRETENV_REGISTRY") {
if !env.is_empty() {
if env.contains("://") {
return BackendUri::parse(&env)
.map(RegistrySelection::Uri)
.with_context(|| format!("SECRETENV_REGISTRY '{env}' is not a valid URI"));
}
return Ok(RegistrySelection::Name(env));
}
}
if config.registries.contains_key("default") {
Ok(RegistrySelection::Name("default".to_owned()))
} else {
bail!(
"no registry selected: pass --registry, set SECRETENV_REGISTRY, \
or define [registries.default] in config.toml"
)
}
}
fn format_sources(aliases: &AliasMap) -> String {
aliases.sources().map(|u| u.raw.as_str()).collect::<Vec<_>>().join(", ")
}
fn new_transaction_id() -> String {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_nanos());
format!("{nanos:032x}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn transaction_id_is_32_hex_chars() {
let id = new_transaction_id();
assert_eq!(id.len(), 32);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn report_outcome_maps_to_telemetry() {
assert_eq!(MigrateReportOutcome::Success.as_telemetry(), MigrateOutcome::Ok);
assert_eq!(
MigrateReportOutcome::PartialFailurePointerFlip.as_telemetry(),
MigrateOutcome::PartialFailure
);
assert_eq!(
MigrateReportOutcome::SourceDeleteFailedPostCommit.as_telemetry(),
MigrateOutcome::OkWithCleanupFailure
);
assert_eq!(MigrateReportOutcome::DryRun.as_telemetry(), MigrateOutcome::DryRun);
}
#[test]
fn pointer_flip_failed_display_omits_uri_body() {
let e = PointerFlipFailed {
alias: "stripe-key".to_owned(),
dest_uri_raw: "vault-prod://secret/payments/stripe_key".to_owned(),
dest_delete_hint: "VAULT_ADDR=… vault kv delete …".to_owned(),
};
let rendered = format!("{e}");
assert!(rendered.contains("stripe-key"), "{rendered}");
assert!(!rendered.contains("vault-prod://"), "leaked URI: {rendered}");
assert!(!rendered.contains("vault kv delete"), "leaked hint: {rendered}");
}
}