use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use reqwest::StatusCode;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::Display;
use thiserror::Error;
use uv_redacted::{DisplaySafeUrl, DisplaySafeUrlError};
use uv_static::EnvVars;
pub(crate) mod pypi;
pub(crate) mod pyx;
#[derive(Debug, Error)]
pub enum TrustedPublishingError {
#[error(transparent)]
Url(#[from] DisplaySafeUrlError),
#[error("Failed to obtain OIDC token: is the `id-token: write` permission missing?")]
GitHubPermissions(#[source] ambient_id::Error),
#[error("Failed to discover OIDC token")]
Discovery(#[source] ambient_id::Error),
#[error("No OIDC token discovered: are you in a supported trusted publishing environment?")]
NoToken,
#[error("Failed to fetch: `{0}`")]
Reqwest(DisplaySafeUrl, #[source] reqwest::Error),
#[error("Failed to fetch: `{0}`")]
ReqwestMiddleware(DisplaySafeUrl, #[source] reqwest_middleware::Error),
#[error(transparent)]
SerdeJson(#[from] serde_json::error::Error),
#[error(
"Server returned error code {0}, is trusted publishing correctly configured?\nResponse: {1}\nToken claims, which must match the publisher configuration: {2:#?}"
)]
TokenRejected(StatusCode, String, OidcTokenClaims),
#[error(
"Server returned error code {0}, and the OIDC has an unexpected format.\nResponse: {1}"
)]
InvalidOidcToken(StatusCode, String),
#[error("The upload URL `{0}` does not look like a valid pyx upload URL")]
InvalidPyxUploadUrl(DisplaySafeUrl),
}
#[derive(Deserialize)]
#[serde(transparent)]
pub struct TrustedPublishingToken(String);
impl Display for TrustedPublishingToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Deserialize)]
struct Audience {
audience: String,
}
#[derive(Serialize)]
struct MintTokenRequest {
token: String,
}
#[derive(Deserialize)]
struct PublishToken {
token: TrustedPublishingToken,
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
#[serde(untagged)]
pub enum OidcTokenClaims {
GitHub(GitHubTokenClaims),
GitLab(GitLabTokenClaims),
Buildkite(BuildkiteTokenClaims),
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
pub struct GitHubTokenClaims {
sub: String,
repository: String,
repository_owner: String,
repository_owner_id: String,
job_workflow_ref: String,
r#ref: String,
environment: Option<String>,
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
pub struct GitLabTokenClaims {
sub: String,
project_path: String,
ci_config_ref_uri: String,
environment: Option<String>,
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
pub struct BuildkiteTokenClaims {
sub: String,
pipeline_slug: String,
organization_slug: String,
}
pub(crate) trait TrustedPublishingService {
fn client(&self) -> &ClientWithMiddleware;
async fn audience(&self) -> Result<String, TrustedPublishingError>;
async fn exchange_token(
&self,
oidc_token: ambient_id::IdToken,
) -> Result<TrustedPublishingToken, TrustedPublishingError>;
async fn get_token(&self) -> Result<Option<TrustedPublishingToken>, TrustedPublishingError> {
let audience = self.audience().await?;
let oidc_token = get_oidc_token(&audience, self.client()).await?;
if let Some(oidc_token) = oidc_token {
let publish_token = self.exchange_token(oidc_token).await?;
#[expect(clippy::print_stdout)]
if env::var(EnvVars::GITHUB_ACTIONS) == Ok("true".to_string()) {
println!("::add-mask::{publish_token}");
}
Ok(Some(publish_token))
} else {
Ok(None)
}
}
}
async fn get_oidc_token(
audience: &str,
client: &ClientWithMiddleware,
) -> Result<Option<ambient_id::IdToken>, TrustedPublishingError> {
let detector = ambient_id::Detector::new_with_client(client.clone());
match detector.detect(audience).await {
Ok(token) => Ok(token),
Err(
err @ ambient_id::Error::GitHubActions(
ambient_id::GitHubError::InsufficientPermissions(_),
),
) => Err(TrustedPublishingError::GitHubPermissions(err)),
Err(err) => Err(TrustedPublishingError::Discovery(err)),
}
}
fn decode_oidc_token(oidc_token: &str) -> Option<OidcTokenClaims> {
let token_segments = oidc_token.splitn(3, '.').collect::<Vec<&str>>();
let [_header, payload, _signature] = *token_segments.into_boxed_slice() else {
return None;
};
let decoded = BASE64_URL_SAFE_NO_PAD.decode(payload).ok()?;
serde_json::from_slice(&decoded).ok()
}