pub mod gql;
use std::{collections::HashSet, time::Duration};
use anyhow::Context;
use cynic::{http::ReqwestExt, MutationBuilder, Operation, QueryBuilder};
use futures::StreamExt;
use time::OffsetDateTime;
use tracing::Instrument;
use url::Url;
use wasmer_deploy_schema::schema::{DeploymentV1, WebcPackageIdentifierV1};
use webc::v1::WebCOwned;
use crate::backend::gql::Log;
use self::gql::{
CreateNamespaceVariables, DeployApp, DeployAppVersion, GetDeployAppAndVersion,
GetNamespaceAppsVariables, PublishDeployAppVariables,
};
const ENDPOINT_DEV: &str = "https://registry.wapm.dev/graphql";
const ENDPOINT_PROD: &str = "https://registry.wapm.io/graphql";
pub fn endpoint_dev() -> Url {
Url::parse(ENDPOINT_DEV).unwrap()
}
pub fn endpoint_prod() -> Url {
Url::parse(ENDPOINT_PROD).unwrap()
}
#[derive(Clone)]
pub struct BackendClient {
client: reqwest::Client,
auth_token: Option<String>,
graphql_endpoint: Url,
}
impl BackendClient {
pub fn with_client(client: reqwest::Client, graphql_endpoint: Url) -> Self {
Self {
client,
auth_token: None,
graphql_endpoint,
}
}
pub fn new(graphql_endpoint: Url) -> Self {
Self {
client: reqwest::Client::new(),
auth_token: None,
graphql_endpoint,
}
}
pub fn with_auth_token(mut self, auth_token: String) -> Self {
self.auth_token = Some(auth_token);
self
}
pub async fn run_graphql<ResponseData, Vars>(
&self,
operation: Operation<ResponseData, Vars>,
) -> Result<ResponseData, anyhow::Error>
where
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
{
let req = self.client.post(self.graphql_endpoint.as_str());
let b = if let Some(token) = &self.auth_token {
req.bearer_auth(token)
} else {
req
};
tracing::trace!(
query=%operation.query,
"running GraphQL query"
);
let query = operation.query.clone();
let res = b.run_graphql(operation).await?;
if let Some(data) = res.data {
if let Some(errors) = res.errors {
if !errors.is_empty() {
tracing::warn!(
?errors,
%query,
endpoint=%self.graphql_endpoint,
"GraphQL query succeeded, but returned errors",
);
}
}
Ok(data)
} else if let Some(errs) = res.errors {
let errs = errs
.iter()
.map(|err| err.to_string())
.collect::<Vec<_>>()
.join(", ");
Err(anyhow::anyhow!("GraphQL query failed: {errs}"))
} else {
Err(anyhow::anyhow!("Query did not return any data"))
}
}
pub fn graphql_endpoint(&self) -> &Url {
&self.graphql_endpoint
}
pub fn auth_token(&self) -> Option<&str> {
self.auth_token.as_deref()
}
}
pub async fn fetch_webc_package(
client: &BackendClient,
ident: &WebcPackageIdentifierV1,
) -> Result<WebCOwned, anyhow::Error> {
let url = ident.build_download_url();
let data = client
.client
.get(url)
.header(reqwest::header::ACCEPT, "application/webc")
.send()
.await?
.error_for_status()?
.bytes()
.await?;
WebCOwned::parse(data, &webc::v1::ParseOptions::default()).context("could not parse webc")
}
pub async fn current_user(
client: &BackendClient,
) -> Result<gql::UserWithNamespaces, anyhow::Error> {
client
.run_graphql(gql::GetCurrentUser::build(()))
.await?
.viewer
.context("not logged in")
}
pub async fn get_app(
client: &BackendClient,
owner: String,
name: String,
) -> Result<Option<gql::DeployApp>, anyhow::Error> {
client
.run_graphql(gql::GetDeployApp::build(gql::GetDeployAppVariables {
name,
owner,
}))
.await
.map(|x| x.get_deploy_app)
}
pub async fn get_app_version(
client: &BackendClient,
owner: String,
name: String,
version: String,
) -> Result<Option<gql::DeployAppVersion>, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppVersion::build(
gql::GetDeployAppVersionVariables {
name,
owner,
version,
},
))
.await
.map(|x| x.get_deploy_app_version)
}
pub async fn get_app_with_version(
client: &BackendClient,
owner: String,
name: String,
version: String,
) -> Result<GetDeployAppAndVersion, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppAndVersion::build(
gql::GetDeployAppAndVersionVariables {
name,
owner,
version,
},
))
.await
}
pub async fn get_app_by_id(
client: &BackendClient,
app_id: String,
) -> Result<DeployApp, anyhow::Error> {
client
.run_graphql(gql::GetDeployAppById::build(gql::GetDeployAppByIdVars {
app_id: app_id.into(),
}))
.await?
.app
.context("app not found")?
.into_deploy_app()
.context("app conversion failed")
}
pub async fn get_app_with_version_by_id(
client: &BackendClient,
app_id: String,
version_id: String,
) -> Result<(DeployApp, DeployAppVersion), anyhow::Error> {
let res = client
.run_graphql(gql::GetDeployAppAndVersionById::build(
gql::GetDeployAppAndVersionByIdVars {
app_id: app_id.into(),
version_id: version_id.into(),
},
))
.await?;
let app = res
.app
.context("app not found")?
.into_deploy_app()
.context("app conversion failed")?;
let version = res
.version
.context("version not found")?
.into_deploy_app_version()
.context("version conversion failed")?;
Ok((app, version))
}
pub async fn user_apps(client: &BackendClient) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let user = client
.run_graphql(gql::GetCurrentUserWithApps::build(()))
.await?
.viewer
.context("not logged in")?;
let apps = user
.apps
.edges
.into_iter()
.filter_map(|x| x)
.filter_map(|x| x.node)
.collect();
Ok(apps)
}
pub async fn user_accessible_apps(
client: &BackendClient,
) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let mut apps = Vec::new();
let user_apps = user_apps(client).await?;
apps.extend(user_apps);
let namespace_res = client.run_graphql(gql::GetCurrentUser::build(())).await?;
let active_user = namespace_res.viewer.context("not logged in")?;
let namespace_names = active_user
.namespaces
.edges
.iter()
.filter_map(|edge| edge.as_ref())
.filter_map(|edge| edge.node.as_ref())
.map(|node| node.name.clone())
.collect::<Vec<_>>();
for namespace in namespace_names {
let out = client
.run_graphql(gql::GetNamespaceApps::build(GetNamespaceAppsVariables {
name: namespace.to_string(),
}))
.await?;
if let Some(ns) = out.get_namespace {
let ns_apps = ns
.apps
.edges
.into_iter()
.filter_map(|x| x)
.filter_map(|x| x.node);
apps.extend(ns_apps);
}
}
Ok(apps)
}
pub async fn namespace_apps(
client: &BackendClient,
namespace: &str,
) -> Result<Vec<gql::DeployApp>, anyhow::Error> {
let res = client
.run_graphql(gql::GetNamespaceApps::build(GetNamespaceAppsVariables {
name: namespace.to_string(),
}))
.await?;
let ns = res
.get_namespace
.with_context(|| format!("failed to get namespace '{}'", namespace))?;
let apps = ns
.apps
.edges
.into_iter()
.filter_map(|x| x)
.filter_map(|x| x.node)
.collect();
Ok(apps)
}
pub async fn publish_deploy_app(
client: &BackendClient,
vars: PublishDeployAppVariables,
) -> Result<DeployAppVersion, anyhow::Error> {
client
.run_graphql(gql::PublishDeployApp::build(vars))
.await?
.publish_deploy_app
.map(|x| x.deploy_app_version)
.context("failed to publish app")
}
pub async fn user_namespaces(client: &BackendClient) -> Result<Vec<gql::Namespace>, anyhow::Error> {
let user = client
.run_graphql(gql::GetCurrentUser::build(()))
.await?
.viewer
.context("not logged in")?;
let ns = user
.namespaces
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(ns)
}
pub async fn get_namespace(
client: &BackendClient,
name: String,
) -> Result<Option<gql::Namespace>, anyhow::Error> {
client
.run_graphql(gql::GetNamespace::build(gql::GetNamespaceVariables {
name,
}))
.await
.map(|x| x.get_namespace)
}
pub async fn create_namespace(
client: &BackendClient,
vars: CreateNamespaceVariables,
) -> Result<gql::Namespace, anyhow::Error> {
client
.run_graphql(gql::CreateNamespace::build(vars))
.await?
.create_namespace
.map(|x| x.namespace)
.context("no namespace returned")
}
pub async fn get_package(
client: &BackendClient,
name: String,
) -> Result<Option<gql::Package>, anyhow::Error> {
client
.run_graphql(gql::GetPackage::build(gql::GetPackageVariables { name }))
.await
.map(|x| x.get_package)
}
pub async fn generate_deploy_token_raw(
client: &BackendClient,
app_version_id: String,
) -> Result<String, anyhow::Error> {
let res = client
.run_graphql(gql::GenerateDeployToken::build(
gql::GenerateDeployTokenVariables { app_version_id },
))
.await?;
res.generate_deploy_token
.map(|x| x.token)
.context("no token returned")
}
#[derive(Debug, PartialEq)]
pub enum TokenKind {
SSH,
Other(Box<DeploymentV1>),
}
pub async fn generate_deploy_config_token_raw(
client: &BackendClient,
token_kind: TokenKind,
) -> Result<String, anyhow::Error> {
let res = client
.run_graphql(gql::GenerateDeployConfigToken::build(
gql::GenerateDeployConfigTokenVariables {
input: match token_kind {
TokenKind::SSH => "{}".to_string(),
TokenKind::Other(deploy) => serde_json::to_string(&deploy)?,
},
},
))
.await?;
res.generate_deploy_config_token
.map(|x| x.token)
.context("no token returned")
}
#[tracing::instrument(skip_all)]
#[allow(clippy::let_with_type_underscore)]
fn get_app_logs(
client: &BackendClient,
name: String,
owner: String,
tag: Option<String>,
start: OffsetDateTime,
end: Option<OffsetDateTime>,
) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
let span = tracing::Span::current();
futures::stream::try_unfold(start, move |start| {
let variables = gql::GetDeployAppLogsVariables {
name: name.clone(),
owner: owner.clone(),
version: tag.clone(),
first: Some(10),
starting_from: unix_timestamp(start),
until: end.map(unix_timestamp),
};
let fut = async move {
let deploy_app_version = client
.run_graphql(gql::GetDeployAppLogs::build(variables))
.await?
.get_deploy_app_version
.context("unknown package version")?;
let page: Vec<_> = deploy_app_version
.logs
.edges
.into_iter()
.flatten()
.filter_map(|edge| edge.node)
.collect();
if page.is_empty() {
Ok(None)
} else {
let last_message = page.last().expect("The page is non-empty");
let timestamp = last_message.timestamp;
let timestamp = OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)
.with_context(|| {
format!("Unable to interpret {timestamp} as a unix timestamp")
})?;
let next_timestamp = timestamp + Duration::from_nanos(1_000);
Ok(Some((page, next_timestamp)))
}
};
fut.instrument(span.clone())
})
}
#[tracing::instrument(skip_all)]
#[allow(clippy::let_with_type_underscore)]
pub async fn get_app_logs_paginated(
client: &BackendClient,
name: String,
owner: String,
tag: Option<String>,
start: OffsetDateTime,
end: Option<OffsetDateTime>,
max_lines: usize,
) -> Result<Vec<Log>, anyhow::Error> {
let mut logs = Vec::new();
let stream = get_app_logs(client, name, owner, tag, start, end);
futures::pin_mut!(stream);
let mut hasher = HashSet::new();
while let Some(res) = stream.next().await {
let mut page = res?;
page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
if page.is_empty() {
break;
}
logs.extend(page);
if logs.len() >= max_lines {
break;
}
}
Ok(logs)
}
fn unix_timestamp(ts: OffsetDateTime) -> f64 {
let nanos_per_second = 1_000_000_000;
let timestamp = ts.unix_timestamp_nanos();
let nanos = timestamp % nanos_per_second;
let secs = timestamp / nanos_per_second;
(secs as f64) + (nanos as f64 / nanos_per_second as f64)
}