pub mod global_id;
pub mod gql;
pub mod stream;
use std::{collections::HashSet, time::Duration};
use anyhow::Context;
use cynic::{http::CynicReqwestError, GraphQlResponse, MutationBuilder, Operation, QueryBuilder};
use futures::StreamExt;
use time::OffsetDateTime;
use tracing::Instrument;
use url::Url;
use wasmer_deploy_schema::schema::{DeploymentV1, NetworkTokenV1, WebcPackageIdentifierV1};
use crate::backend::gql::Log;
use self::gql::{
CreateNamespaceVars, DeployApp, DeployAppConnection, DeployAppVersion,
DeployAppVersionConnection, GetDeployAppAndVersion, GetDeployAppVersionsVars,
GetNamespaceAppsVars, PackageVersionConnection, PublishDeployAppVars,
};
const ENDPOINT_DEV: &str = "https://registry.wapm.dev/graphql";
const ENDPOINT_PROD: &str = "https://registry.wapm.io/graphql";
pub fn endpoint_dev() -> Url {
Url::parse(ENDPOINT_DEV).unwrap()
}
pub fn endpoint_prod() -> Url {
Url::parse(ENDPOINT_PROD).unwrap()
}
#[derive(Clone, Debug)]
pub struct BackendClient {
auth_token: Option<String>,
graphql_endpoint: Url,
client: reqwest::Client,
#[allow(unused)]
extra_debugging: bool,
}
impl BackendClient {
pub fn with_client(client: reqwest::Client, graphql_endpoint: Url) -> Self {
Self {
client,
auth_token: None,
graphql_endpoint,
extra_debugging: false,
}
}
pub fn graphql_endpoint(&self) -> &Url {
&self.graphql_endpoint
}
pub fn auth_token(&self) -> Option<&str> {
self.auth_token.as_deref()
}
pub fn new(graphql_endpoint: Url) -> Self {
Self {
client: reqwest::Client::new(),
auth_token: None,
graphql_endpoint,
extra_debugging: false,
}
}
pub fn with_auth_token(mut self, auth_token: String) -> Self {
self.auth_token = Some(auth_token);
self
}
pub async fn run_graphql_raw<ResponseData, Vars>(
&self,
operation: Operation<ResponseData, Vars>,
) -> Result<cynic::GraphQlResponse<ResponseData>, anyhow::Error>
where
Vars: serde::Serialize + std::fmt::Debug,
ResponseData: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
{
let req = self.client.post(self.graphql_endpoint.as_str());
let b = if let Some(token) = &self.auth_token {
req.bearer_auth(token)
} else {
req
};
if self.extra_debugging {
tracing::trace!(
query=%operation.query,
vars=?operation.variables,
"running GraphQL query"
);
}
let query = operation.query.clone();
let res = b.json(&operation).send().await;
let res = match res {
Ok(response) => {
let status = response.status();
if !status.is_success() {
let body_string = match response.text().await {
Ok(b) => b,
Err(err) => {
tracing::error!("could not load response body: {err}");
"<could not retrieve body>".to_string()
}
};
match serde_json::from_str::<GraphQlResponse<ResponseData>>(&body_string) {
Ok(response) => Ok(response),
Err(_) => Err(CynicReqwestError::ErrorResponse(status, body_string)),
}
} else {
let body = response.bytes().await?;
let jd = &mut serde_json::Deserializer::from_slice(&body);
let data: Result<GraphQlResponse<ResponseData>, _> =
serde_path_to_error::deserialize(jd).map_err(|err| {
let body_txt = String::from_utf8_lossy(&body);
CynicReqwestError::ErrorResponse(
reqwest::StatusCode::INTERNAL_SERVER_ERROR,
format!("Could not decode JSON response: {err} -- '{body_txt}'"),
)
});
data
}
}
Err(e) => Err(CynicReqwestError::ReqwestError(e)),
};
let res = res?;
if let Some(errors) = &res.errors {
if !errors.is_empty() {
tracing::warn!(
?errors,
data=?res.data,
%query,
endpoint=%self.graphql_endpoint,
"GraphQL query succeeded, but returned errors",
);
}
}
Ok(res)
}
pub async fn run_graphql<ResponseData, Vars>(
&self,
operation: Operation<ResponseData, Vars>,
) -> Result<ResponseData, anyhow::Error>
where
Vars: serde::Serialize + std::fmt::Debug,
ResponseData: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
{
let res = self.run_graphql_raw(operation).await?;
if let Some(data) = res.data {
Ok(data)
} else if let Some(errs) = res.errors {
let errs = GraphQLApiFailure { errors: errs };
Err(errs).context("GraphQL query failed")
} else {
Err(anyhow::anyhow!("Query did not return any data"))
}
}
pub async fn run_graphql_strict<ResponseData, Vars>(
&self,
operation: Operation<ResponseData, Vars>,
) -> Result<ResponseData, anyhow::Error>
where
Vars: serde::Serialize + std::fmt::Debug,
ResponseData: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
{
let res = self.run_graphql_raw(operation).await?;
if let Some(errs) = res.errors {
if !errs.is_empty() {
let errs = GraphQLApiFailure { errors: errs };
return Err(errs).context("GraphQL query failed");
}
}
if let Some(data) = res.data {
Ok(data)
} else {
Err(anyhow::anyhow!("Query did not return any data"))
}
}
}
#[derive(Debug)]
pub struct GraphQLApiFailure {
pub errors: Vec<cynic::GraphQlError>,
}
impl GraphQLApiFailure {
pub fn from_errors(
msg: impl Into<String>,
errors: Option<Vec<cynic::GraphQlError>>,
) -> anyhow::Error {
let msg = msg.into();
if let Some(errs) = errors {
if !errs.is_empty() {
let err = GraphQLApiFailure { errors: errs };
return anyhow::Error::new(err).context(msg);
}
}
anyhow::anyhow!("{msg} - query did not return any data")
}
}
impl std::fmt::Display for GraphQLApiFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let errs = self
.errors
.iter()
.map(|err| err.to_string())
.collect::<Vec<_>>()
.join(", ");
write!(f, "GraphQL API failure: {}", errs)
}
}
impl std::error::Error for GraphQLApiFailure {}
pub async fn fetch_webc_package(
client: &BackendClient,
ident: &WebcPackageIdentifierV1,
default_registry: &Url,
) -> Result<webc::compat::Container, anyhow::Error> {
let url = ident.build_download_url_with_default_registry(default_registry);
let data = client
.client
.get(url)
.header(reqwest::header::ACCEPT, "application/webc")
.send()
.await?
.error_for_status()?
.bytes()
.await?;
webc::compat::Container::from_bytes(data).context("failed to parse webc package")
}
pub async fn current_user_with_namespaces(
client: &BackendClient,
namespace_role: Option<gql::GrapheneRole>,
) -> Result<gql::UserWithNamespaces, anyhow::Error> {
client
.run_graphql(gql::GetCurrentUser::build(gql::GetCurrentUserVars {
namespace_role,
}))
.await?
.viewer
.context("not logged in")
}
pub async fn get_app(
client: &BackendClient,
owner: String,
name: String,
) -> Result<Option<gql::DeployApp>, anyhow::Error> {
client
.run_graphql(gql::GetDeployApp::build(gql::GetDeployAppVars {
name,
owner,
}))
.await
.map(|x| x.get_deploy_app)
}
pub async fn get_app_by_alias(
client: &BackendClient,
alias: String,
) -> Result<Option<gql::DeployApp>, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppByAlias::build(
gql::GetDeployAppByAliasVars { alias },
))
.await
.map(|x| x.get_app_by_global_alias)
}
pub async fn get_app_version(
client: &BackendClient,
owner: String,
name: String,
version: String,
) -> Result<Option<gql::DeployAppVersion>, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppVersion::build(
gql::GetDeployAppVersionVars {
name,
owner,
version,
},
))
.await
.map(|x| x.get_deploy_app_version)
}
pub async fn get_app_with_version(
client: &BackendClient,
owner: String,
name: String,
version: String,
) -> Result<GetDeployAppAndVersion, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppAndVersion::build(
gql::GetDeployAppAndVersionVars {
name,
owner,
version,
},
))
.await
}
pub async fn get_app_and_package_by_name(
client: &BackendClient,
vars: gql::GetPackageAndAppVars,
) -> Result<(Option<gql::Package>, Option<gql::DeployApp>), anyhow::Error> {
let res = client
.run_graphql(gql::GetPackageAndApp::build(vars))
.await?;
Ok((res.get_package, res.get_deploy_app))
}
pub async fn get_deploy_apps(
client: &BackendClient,
vars: gql::GetDeployAppsVars,
) -> Result<DeployAppConnection, anyhow::Error> {
let res = client.run_graphql(gql::GetDeployApps::build(vars)).await?;
res.get_deploy_apps.context("no apps returned")
}
pub fn get_deploy_apps_stream(
client: &BackendClient,
vars: gql::GetDeployAppsVars,
) -> impl futures::Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + '_ {
futures::stream::try_unfold(
Some(vars),
move |vars: Option<gql::GetDeployAppsVars>| async move {
let vars = match vars {
Some(vars) => vars,
None => return Ok(None),
};
let page = get_deploy_apps(client, vars.clone()).await?;
let end_cursor = page.page_info.end_cursor;
let items = page
.edges
.into_iter()
.filter_map(|x| x.and_then(|x| x.node))
.collect::<Vec<_>>();
let new_vars = end_cursor.map(|c| gql::GetDeployAppsVars {
after: Some(c),
..vars
});
Ok(Some((items, new_vars)))
},
)
}
pub async fn get_deploy_app_versions(
client: &BackendClient,
vars: GetDeployAppVersionsVars,
) -> Result<DeployAppVersionConnection, anyhow::Error> {
let res = client
.run_graphql_strict(gql::GetDeployAppVersions::build(vars))
.await?;
let versions = res.get_deploy_app.context("app not found")?.versions;
Ok(versions)
}
pub async fn get_app_by_id(
client: &BackendClient,
app_id: String,
) -> Result<DeployApp, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppById::build(gql::GetDeployAppByIdVars {
app_id: app_id.into(),
}))
.await?
.app
.context("app not found")?
.into_deploy_app()
.context("app conversion failed")
}
pub async fn get_node(
client: &BackendClient,
id: String,
) -> Result<Option<gql::Node>, anyhow::Error> {
client
.run_graphql(gql::GetNode::build(gql::GetNodeVars { id: id.into() }))
.await
.map(|x| x.node)
}
pub async fn get_app_with_version_by_id(
client: &BackendClient,
app_id: String,
version_id: String,
) -> Result<(DeployApp, DeployAppVersion), anyhow::Error> {
let res = client
.run_graphql(gql::GetDeployAppAndVersionById::build(
gql::GetDeployAppAndVersionByIdVars {
app_id: app_id.into(),
version_id: version_id.into(),
},
))
.await?;
let app = res
.app
.context("app not found")?
.into_deploy_app()
.context("app conversion failed")?;
let version = res
.version
.context("version not found")?
.into_deploy_app_version()
.context("version conversion failed")?;
Ok((app, version))
}
pub async fn get_app_version_by_id(
client: &BackendClient,
version_id: String,
) -> Result<DeployAppVersion, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppVersionById::build(
gql::GetDeployAppVersionByIdVars {
version_id: version_id.into(),
},
))
.await?
.version
.context("app not found")?
.into_deploy_app_version()
.context("app version conversion failed")
}
pub async fn get_app_version_by_id_with_app(
client: &BackendClient,
version_id: String,
) -> Result<(DeployApp, DeployAppVersion), anyhow::Error> {
let version = client
.run_graphql(gql::GetDeployAppVersionById::build(
gql::GetDeployAppVersionByIdVars {
version_id: version_id.into(),
},
))
.await?
.version
.context("app not found")?
.into_deploy_app_version()
.context("app version conversion failed")?;
let app_id = version
.app
.as_ref()
.context("could not load app for version")?
.id
.clone();
let app = get_app_by_id(client, app_id.into_inner()).await?;
Ok((app, version))
}
pub async fn user_apps(client: &BackendClient) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let user = client
.run_graphql(gql::GetCurrentUserWithApps::build(()))
.await?
.viewer
.context("not logged in")?;
let apps = user
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(apps)
}
pub async fn user_accessible_apps(
client: &BackendClient,
) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let mut apps = Vec::new();
let user_apps = user_apps(client).await?;
apps.extend(user_apps);
let namespace_res = client
.run_graphql(gql::GetCurrentUser::build(gql::GetCurrentUserVars {
namespace_role: None,
}))
.await?;
let active_user = namespace_res.viewer.context("not logged in")?;
let namespace_names = active_user
.namespaces
.edges
.iter()
.filter_map(|edge| edge.as_ref())
.filter_map(|edge| edge.node.as_ref())
.map(|node| node.name.clone())
.collect::<Vec<_>>();
for namespace in namespace_names {
let out = client
.run_graphql(gql::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
}))
.await?;
if let Some(ns) = out.get_namespace {
let ns_apps = ns.apps.edges.into_iter().flatten().filter_map(|x| x.node);
apps.extend(ns_apps);
}
}
Ok(apps)
}
pub async fn namespace_apps(
client: &BackendClient,
namespace: &str,
) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let res = client
.run_graphql(gql::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
}))
.await?;
let ns = res
.get_namespace
.with_context(|| format!("failed to get namespace '{}'", namespace))?;
let apps = ns
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(apps)
}
pub async fn publish_deploy_app(
client: &BackendClient,
vars: PublishDeployAppVars,
) -> Result<DeployAppVersion, anyhow::Error> {
let res = client
.run_graphql_raw(gql::PublishDeployApp::build(vars))
.await?;
if let Some(app) = res
.data
.and_then(|d| d.publish_deploy_app)
.map(|d| d.deploy_app_version)
{
Ok(app)
} else {
Err(GraphQLApiFailure::from_errors(
"could not publish app",
res.errors,
))
}
}
pub async fn user_namespaces(client: &BackendClient) -> Result<Vec<gql::Namespace>, anyhow::Error> {
let user = client
.run_graphql(gql::GetCurrentUser::build(gql::GetCurrentUserVars {
namespace_role: None,
}))
.await?
.viewer
.context("not logged in")?;
let ns = user
.namespaces
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(ns)
}
pub async fn get_namespace(
client: &BackendClient,
name: String,
) -> Result<Option<gql::Namespace>, anyhow::Error> {
client
.run_graphql(gql::GetNamespace::build(gql::GetNamespaceVars { name }))
.await
.map(|x| x.get_namespace)
}
pub async fn create_namespace(
client: &BackendClient,
vars: CreateNamespaceVars,
) -> Result<gql::Namespace, anyhow::Error> {
client
.run_graphql(gql::CreateNamespace::build(vars))
.await?
.create_namespace
.map(|x| x.namespace)
.context("no namespace returned")
}
pub async fn get_package(
client: &BackendClient,
name: String,
) -> Result<Option<gql::Package>, anyhow::Error> {
client
.run_graphql_strict(gql::GetPackage::build(gql::GetPackageVars { name }))
.await
.map(|x| x.get_package)
}
pub async fn get_package_version(
client: &BackendClient,
name: String,
version: String,
) -> Result<Option<gql::PackageVersionWithPackage>, anyhow::Error> {
client
.run_graphql_strict(gql::GetPackageVersion::build(gql::GetPackageVersionVars {
name,
version,
}))
.await
.map(|x| x.get_package_version)
}
pub async fn get_package_versions(
client: &BackendClient,
vars: gql::AllPackageVersionsVars,
) -> Result<PackageVersionConnection, anyhow::Error> {
let res = client
.run_graphql(gql::GetAllPackageVersions::build(vars))
.await?;
Ok(res.all_package_versions)
}
pub fn get_package_versions_stream(
client: &BackendClient,
vars: gql::AllPackageVersionsVars,
) -> impl futures::Stream<Item = Result<Vec<gql::PackageVersionWithPackage>, anyhow::Error>> + '_ {
futures::stream::try_unfold(
Some(vars),
move |vars: Option<gql::AllPackageVersionsVars>| async move {
let vars = match vars {
Some(vars) => vars,
None => return Ok(None),
};
let page = get_package_versions(client, vars.clone()).await?;
let end_cursor = page.page_info.end_cursor;
let items = page
.edges
.into_iter()
.filter_map(|x| x.and_then(|x| x.node))
.collect::<Vec<_>>();
let new_vars = end_cursor.map(|cursor| gql::AllPackageVersionsVars {
after: Some(cursor),
..vars
});
Ok(Some((items, new_vars)))
},
)
}
pub async fn generate_deploy_token_raw(
client: &BackendClient,
app_version_id: String,
) -> Result<String, anyhow::Error> {
let res = client
.run_graphql(gql::GenerateDeployToken::build(
gql::GenerateDeployTokenVars { app_version_id },
))
.await?;
res.generate_deploy_token
.map(|x| x.token)
.context("no token returned")
}
#[derive(Debug, PartialEq)]
pub enum GenerateTokenBy {
Id(NetworkTokenV1),
}
#[derive(Debug, PartialEq)]
pub enum TokenKind {
SSH,
Network(GenerateTokenBy),
Other(Box<DeploymentV1>),
}
pub async fn generate_deploy_config_token_raw(
client: &BackendClient,
token_kind: TokenKind,
) -> Result<String, anyhow::Error> {
let res = client
.run_graphql(gql::GenerateDeployConfigToken::build(
gql::GenerateDeployConfigTokenVars {
input: match token_kind {
TokenKind::SSH => "{}".to_string(),
TokenKind::Network(by) => match by {
GenerateTokenBy::Id(token) => serde_json::to_string(&token)?,
},
TokenKind::Other(deploy) => serde_json::to_string(&deploy)?,
},
},
))
.await?;
res.generate_deploy_config_token
.map(|x| x.token)
.context("no token returned")
}
#[tracing::instrument(skip_all, level = "debug")]
#[allow(clippy::let_with_type_underscore)]
fn get_app_logs(
client: &BackendClient,
name: String,
owner: String,
tag: Option<String>,
start: OffsetDateTime,
end: Option<OffsetDateTime>,
) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
let span = tracing::Span::current();
futures::stream::try_unfold(start, move |start| {
let variables = gql::GetDeployAppLogsVars {
name: name.clone(),
owner: owner.clone(),
version: tag.clone(),
first: Some(10),
starting_from: unix_timestamp(start),
until: end.map(unix_timestamp),
};
let fut = async move {
let deploy_app_version = client
.run_graphql(gql::GetDeployAppLogs::build(variables))
.await?
.get_deploy_app_version
.context("unknown package version")?;
let page: Vec<_> = deploy_app_version
.logs
.edges
.into_iter()
.flatten()
.filter_map(|edge| edge.node)
.collect();
if page.is_empty() {
Ok(None)
} else {
let last_message = page.last().expect("The page is non-empty");
let timestamp = last_message.timestamp;
let timestamp = OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)
.with_context(|| {
format!("Unable to interpret {timestamp} as a unix timestamp")
})?;
let next_timestamp = timestamp + Duration::from_nanos(1_000);
Ok(Some((page, next_timestamp)))
}
};
fut.instrument(span.clone())
})
}
#[tracing::instrument(skip_all, level = "debug")]
#[allow(clippy::let_with_type_underscore)]
pub async fn get_app_logs_paginated(
client: &BackendClient,
name: String,
owner: String,
tag: Option<String>,
start: OffsetDateTime,
end: Option<OffsetDateTime>,
max_lines: usize,
) -> Result<Vec<Log>, anyhow::Error> {
let mut logs = Vec::new();
let stream = get_app_logs(client, name, owner, tag, start, end);
futures::pin_mut!(stream);
let mut hasher = HashSet::new();
while let Some(res) = stream.next().await {
let mut page = res?;
page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
if page.is_empty() {
break;
}
logs.extend(page);
if logs.len() >= max_lines {
break;
}
}
Ok(logs)
}
fn unix_timestamp(ts: OffsetDateTime) -> f64 {
let nanos_per_second = 1_000_000_000;
let timestamp = ts.unix_timestamp_nanos();
let nanos = timestamp % nanos_per_second;
let secs = timestamp / nanos_per_second;
(secs as f64) + (nanos as f64 / nanos_per_second as f64)
}