use tracing::{debug, trace};
use url::Url;
use uv_redacted::DisplaySafeUrl;
use crate::trusted_publishing::{
Audience, MintTokenRequest, PublishToken, TrustedPublishingError, TrustedPublishingService,
TrustedPublishingToken, decode_oidc_token,
};
pub(crate) struct PyxPublishingService<'a> {
client: &'a reqwest_middleware::ClientWithMiddleware,
registry: &'a uv_redacted::DisplaySafeUrl,
}
impl<'a> PyxPublishingService<'a> {
pub(crate) fn new(
registry: &'a uv_redacted::DisplaySafeUrl,
client: &'a uv_client::BaseClient,
) -> Self {
Self {
client: client.for_host(registry).raw_client(),
registry,
}
}
}
impl TrustedPublishingService for PyxPublishingService<'_> {
fn client(&self) -> &reqwest_middleware::ClientWithMiddleware {
self.client
}
async fn audience(&self) -> Result<String, TrustedPublishingError> {
let scheme: &str = if cfg!(feature = "test") {
self.registry.scheme()
} else {
"https"
};
let audience_url = DisplaySafeUrl::parse(&format!(
"{}://{}/v1/trusted-publishing/audience",
scheme,
self.registry.authority()
))?;
debug!("Querying the trusted publishing audience from {audience_url}");
let response = self
.client
.get(Url::from(audience_url.clone()))
.send()
.await
.map_err(|err| TrustedPublishingError::ReqwestMiddleware(audience_url.clone(), err))?;
let audience = response
.error_for_status()
.map_err(|err| TrustedPublishingError::Reqwest(audience_url.clone(), err))?
.json::<Audience>()
.await
.map_err(|err| TrustedPublishingError::Reqwest(audience_url.clone(), err))?;
trace!("The audience is `{}`", &audience.audience);
Ok(audience.audience)
}
async fn exchange_token(
&self,
oidc_token: ambient_id::IdToken,
) -> Result<TrustedPublishingToken, TrustedPublishingError> {
let scheme: &str = if cfg!(feature = "test") {
self.registry.scheme()
} else {
"https"
};
let path_segments: Vec<&str> = self
.registry
.path_segments()
.map_or(Vec::new(), std::iter::Iterator::collect);
let (["v1", "upload", workspace_name, registry_name]
| ["v1", "upload", workspace_name, registry_name, "/"]) = path_segments[..]
else {
return Err(TrustedPublishingError::InvalidPyxUploadUrl(
self.registry.clone(),
));
};
let mint_token_url = DisplaySafeUrl::parse(&format!(
"{}://{}/v1/trusted-publishing/{}/{}/mint-token",
scheme,
self.registry.authority(),
workspace_name,
registry_name
))?;
debug!("Querying the trusted publishing upload token from {mint_token_url}");
let mint_token_payload = MintTokenRequest {
token: oidc_token.reveal().to_string(),
};
let response = self
.client
.post(Url::from(mint_token_url.clone()))
.json(&mint_token_payload)
.send()
.await
.map_err(|err| {
TrustedPublishingError::ReqwestMiddleware(mint_token_url.clone(), err)
})?;
let status = response.status();
let body = response
.bytes()
.await
.map_err(|err| TrustedPublishingError::Reqwest(mint_token_url.clone(), err))?;
if status.is_success() {
let publish_token: PublishToken = serde_json::from_slice(&body)?;
Ok(publish_token.token)
} else {
match decode_oidc_token(oidc_token.reveal()) {
Some(claims) => {
Err(TrustedPublishingError::TokenRejected(
status,
String::from_utf8_lossy(&body).to_string(),
claims,
))
}
None => {
Err(TrustedPublishingError::InvalidOidcToken(
status,
String::from_utf8_lossy(&body).to_string(),
))
}
}
}
}
}