wasmer_api/
query.rs

1use std::{collections::HashSet, time::Duration};
2
3use anyhow::{bail, Context};
4use cynic::{MutationBuilder, QueryBuilder};
5use futures::StreamExt;
6use merge_streams::MergeStreams;
7use time::OffsetDateTime;
8use tracing::Instrument;
9use url::Url;
10use wasmer_config::package::PackageIdent;
11
12use crate::{
13    types::{self, *},
14    GraphQLApiFailure, WasmerClient,
15};
16
17/// Rotate the s3 secrets tied to an app given its id.
18pub async fn rotate_s3_secrets(
19    client: &WasmerClient,
20    app_id: types::Id,
21) -> Result<(), anyhow::Error> {
22    client
23        .run_graphql_strict(types::RotateS3SecretsForApp::build(
24            RotateS3SecretsForAppVariables { id: app_id },
25        ))
26        .await?;
27
28    Ok(())
29}
30
31pub async fn viewer_can_deploy_to_namespace(
32    client: &WasmerClient,
33    owner_name: &str,
34) -> Result<bool, anyhow::Error> {
35    client
36        .run_graphql_strict(types::ViewerCan::build(ViewerCanVariables {
37            action: OwnerAction::DeployApp,
38            owner_name,
39        }))
40        .await
41        .map(|v| v.viewer_can)
42}
43
44pub async fn redeploy_app_by_id(
45    client: &WasmerClient,
46    app_id: impl Into<String>,
47) -> Result<Option<DeployApp>, anyhow::Error> {
48    client
49        .run_graphql_strict(types::RedeployActiveApp::build(
50            RedeployActiveAppVariables {
51                id: types::Id::from(app_id),
52            },
53        ))
54        .await
55        .map(|v| v.redeploy_active_version.map(|v| v.app))
56}
57
58/// Revoke an existing token
59pub async fn revoke_token(
60    client: &WasmerClient,
61    token: String,
62) -> Result<Option<bool>, anyhow::Error> {
63    client
64        .run_graphql_strict(types::RevokeToken::build(RevokeTokenVariables { token }))
65        .await
66        .map(|v| v.revoke_api_token.and_then(|v| v.success))
67}
68
69/// Generate a new Nonce
70///
71/// Takes a name and a callbackUrl and returns a nonce
72pub async fn create_nonce(
73    client: &WasmerClient,
74    name: String,
75    callback_url: String,
76) -> Result<Option<Nonce>, anyhow::Error> {
77    client
78        .run_graphql_strict(types::CreateNewNonce::build(CreateNewNonceVariables {
79            callback_url,
80            name,
81        }))
82        .await
83        .map(|v| v.new_nonce.map(|v| v.nonce))
84}
85
86pub async fn get_app_secret_value_by_id(
87    client: &WasmerClient,
88    secret_id: impl Into<String>,
89) -> Result<Option<String>, anyhow::Error> {
90    client
91        .run_graphql_strict(types::GetAppSecretValue::build(
92            GetAppSecretValueVariables {
93                id: types::Id::from(secret_id),
94            },
95        ))
96        .await
97        .map(|v| v.get_secret_value)
98}
99
100pub async fn get_app_secret_by_name(
101    client: &WasmerClient,
102    app_id: impl Into<String>,
103    name: impl Into<String>,
104) -> Result<Option<Secret>, anyhow::Error> {
105    client
106        .run_graphql_strict(types::GetAppSecret::build(GetAppSecretVariables {
107            app_id: types::Id::from(app_id),
108            secret_name: name.into(),
109        }))
110        .await
111        .map(|v| v.get_app_secret)
112}
113
114/// Update or create an app secret.
115pub async fn upsert_app_secret(
116    client: &WasmerClient,
117    app_id: impl Into<String>,
118    name: impl Into<String>,
119    value: impl Into<String>,
120) -> Result<Option<UpsertAppSecretPayload>, anyhow::Error> {
121    client
122        .run_graphql_strict(types::UpsertAppSecret::build(UpsertAppSecretVariables {
123            app_id: cynic::Id::from(app_id.into()),
124            name: name.into().as_str(),
125            value: value.into().as_str(),
126        }))
127        .await
128        .map(|v| v.upsert_app_secret)
129}
130
131/// Update or create app secrets in bulk.
132pub async fn upsert_app_secrets(
133    client: &WasmerClient,
134    app_id: impl Into<String>,
135    secrets: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
136) -> Result<Option<UpsertAppSecretsPayload>, anyhow::Error> {
137    client
138        .run_graphql_strict(types::UpsertAppSecrets::build(UpsertAppSecretsVariables {
139            app_id: cynic::Id::from(app_id.into()),
140            secrets: Some(
141                secrets
142                    .into_iter()
143                    .map(|(name, value)| SecretInput {
144                        name: name.into(),
145                        value: value.into(),
146                    })
147                    .collect(),
148            ),
149        }))
150        .await
151        .map(|v| v.upsert_app_secrets)
152}
153
154/// Load all secrets of an app.
155///
156/// Will paginate through all versions and return them in a single list.
157pub async fn get_all_app_secrets_filtered(
158    client: &WasmerClient,
159    app_id: impl Into<String>,
160    names: impl IntoIterator<Item = impl Into<String>>,
161) -> Result<Vec<Secret>, anyhow::Error> {
162    let mut vars = GetAllAppSecretsVariables {
163        after: None,
164        app_id: types::Id::from(app_id),
165        before: None,
166        first: None,
167        last: None,
168        offset: None,
169        names: Some(names.into_iter().map(|s| s.into()).collect()),
170    };
171
172    let mut all_secrets = Vec::<Secret>::new();
173
174    loop {
175        let page = get_app_secrets(client, vars.clone()).await?;
176        if page.edges.is_empty() {
177            break;
178        }
179
180        for edge in page.edges {
181            let edge = match edge {
182                Some(edge) => edge,
183                None => continue,
184            };
185            let version = match edge.node {
186                Some(item) => item,
187                None => continue,
188            };
189
190            all_secrets.push(version);
191
192            // Update pagination.
193            vars.after = Some(edge.cursor);
194        }
195    }
196
197    Ok(all_secrets)
198}
199
200/// Retrieve volumes for an app.
201pub async fn get_app_volumes(
202    client: &WasmerClient,
203    owner: impl Into<String>,
204    name: impl Into<String>,
205) -> Result<Vec<types::AppVersionVolume>, anyhow::Error> {
206    let vars = types::GetAppVolumesVars {
207        owner: owner.into(),
208        name: name.into(),
209    };
210    let res = client
211        .run_graphql_strict(types::GetAppVolumes::build(vars))
212        .await?;
213    let volumes = res
214        .get_deploy_app
215        .context("app not found")?
216        .active_version
217        .volumes
218        .unwrap_or_default()
219        .into_iter()
220        .flatten()
221        .collect();
222    Ok(volumes)
223}
224
225/// Load the S3 credentials.
226///
227/// S3 can be used to get access to an apps volumes.
228pub async fn get_app_s3_credentials(
229    client: &WasmerClient,
230    app_id: impl Into<String>,
231) -> Result<types::S3Credentials, anyhow::Error> {
232    let app_id = app_id.into();
233
234    // Firt load the app to get the s3 url.
235    let app1 = get_app_by_id(client, app_id.clone()).await?;
236
237    let vars = types::GetDeployAppVars {
238        owner: app1.owner.global_name,
239        name: app1.name,
240    };
241    client
242        .run_graphql_strict(types::GetDeployAppS3Credentials::build(vars))
243        .await?
244        .get_deploy_app
245        .context("app not found")?
246        .s3_credentials
247        .context("app does not have S3 credentials")
248}
249
250/// Load all available regions.
251///
252/// Will paginate through all versions and return them in a single list.
253pub async fn get_all_app_regions(client: &WasmerClient) -> Result<Vec<AppRegion>, anyhow::Error> {
254    let mut vars = GetAllAppRegionsVariables {
255        after: None,
256        before: None,
257        first: None,
258        last: None,
259        offset: None,
260    };
261
262    let mut all_regions = Vec::<AppRegion>::new();
263
264    loop {
265        let page = get_regions(client, vars.clone()).await?;
266        if page.edges.is_empty() {
267            break;
268        }
269
270        for edge in page.edges {
271            let edge = match edge {
272                Some(edge) => edge,
273                None => continue,
274            };
275            let version = match edge.node {
276                Some(item) => item,
277                None => continue,
278            };
279
280            all_regions.push(version);
281
282            // Update pagination.
283            vars.after = Some(edge.cursor);
284        }
285    }
286
287    Ok(all_regions)
288}
289
290/// Retrieve regions.
291pub async fn get_regions(
292    client: &WasmerClient,
293    vars: GetAllAppRegionsVariables,
294) -> Result<AppRegionConnection, anyhow::Error> {
295    let res = client
296        .run_graphql_strict(types::GetAllAppRegions::build(vars))
297        .await?;
298    Ok(res.get_app_regions)
299}
300
301/// Load all secrets of an app.
302///
303/// Will paginate through all versions and return them in a single list.
304pub async fn get_all_app_secrets(
305    client: &WasmerClient,
306    app_id: impl Into<String>,
307) -> Result<Vec<Secret>, anyhow::Error> {
308    let mut vars = GetAllAppSecretsVariables {
309        after: None,
310        app_id: types::Id::from(app_id),
311        before: None,
312        first: None,
313        last: None,
314        offset: None,
315        names: None,
316    };
317
318    let mut all_secrets = Vec::<Secret>::new();
319
320    loop {
321        let page = get_app_secrets(client, vars.clone()).await?;
322        if page.edges.is_empty() {
323            break;
324        }
325
326        for edge in page.edges {
327            let edge = match edge {
328                Some(edge) => edge,
329                None => continue,
330            };
331            let version = match edge.node {
332                Some(item) => item,
333                None => continue,
334            };
335
336            all_secrets.push(version);
337
338            // Update pagination.
339            vars.after = Some(edge.cursor);
340        }
341    }
342
343    Ok(all_secrets)
344}
345
346/// Retrieve secrets for an app.
347pub async fn get_app_secrets(
348    client: &WasmerClient,
349    vars: GetAllAppSecretsVariables,
350) -> Result<SecretConnection, anyhow::Error> {
351    let res = client
352        .run_graphql_strict(types::GetAllAppSecrets::build(vars))
353        .await?;
354    res.get_app_secrets.context("app not found")
355}
356
357pub async fn delete_app_secret(
358    client: &WasmerClient,
359    secret_id: impl Into<String>,
360) -> Result<Option<DeleteAppSecretPayload>, anyhow::Error> {
361    client
362        .run_graphql_strict(types::DeleteAppSecret::build(DeleteAppSecretVariables {
363            id: types::Id::from(secret_id.into()),
364        }))
365        .await
366        .map(|v| v.delete_app_secret)
367}
368
369/// Load a webc package from the registry.
370///
371/// NOTE: this uses the public URL instead of the download URL available through
372/// the API, and should not be used where possible.
373pub async fn fetch_webc_package(
374    client: &WasmerClient,
375    ident: &PackageIdent,
376    default_registry: &Url,
377) -> Result<webc::compat::Container, anyhow::Error> {
378    let url = match ident {
379        PackageIdent::Named(n) => Url::parse(&format!(
380            "{default_registry}/{}:{}",
381            n.full_name(),
382            n.version_or_default()
383        ))?,
384        PackageIdent::Hash(h) => match get_package_release(client, &h.to_string()).await? {
385            Some(webc) => Url::parse(&webc.webc_url)?,
386            None => anyhow::bail!("Could not find package with hash '{}'", h),
387        },
388    };
389
390    let data = client
391        .client
392        .get(url)
393        .header(reqwest::header::USER_AGENT, &client.user_agent)
394        .header(reqwest::header::ACCEPT, "application/webc")
395        .send()
396        .await?
397        .error_for_status()?
398        .bytes()
399        .await?;
400
401    webc::compat::Container::from_bytes(data).context("failed to parse webc package")
402}
403
404/// Fetch app templates.
405pub async fn fetch_app_template_from_slug(
406    client: &WasmerClient,
407    slug: String,
408) -> Result<Option<types::AppTemplate>, anyhow::Error> {
409    client
410        .run_graphql_strict(types::GetAppTemplateFromSlug::build(
411            GetAppTemplateFromSlugVariables { slug },
412        ))
413        .await
414        .map(|v| v.get_app_template)
415}
416
417/// Fetch app templates.
418pub async fn fetch_app_templates_from_framework(
419    client: &WasmerClient,
420    framework_slug: String,
421    first: i32,
422    after: Option<String>,
423    sort_by: Option<types::AppTemplatesSortBy>,
424) -> Result<Option<types::AppTemplateConnection>, anyhow::Error> {
425    client
426        .run_graphql_strict(types::GetAppTemplatesFromFramework::build(
427            GetAppTemplatesFromFrameworkVars {
428                framework_slug,
429                first,
430                after,
431                sort_by,
432            },
433        ))
434        .await
435        .map(|r| r.get_app_templates)
436}
437
438/// Fetch app templates.
439pub async fn fetch_app_templates(
440    client: &WasmerClient,
441    category_slug: String,
442    first: i32,
443    after: Option<String>,
444    sort_by: Option<types::AppTemplatesSortBy>,
445) -> Result<Option<types::AppTemplateConnection>, anyhow::Error> {
446    client
447        .run_graphql_strict(types::GetAppTemplates::build(GetAppTemplatesVars {
448            category_slug,
449            first,
450            after,
451            sort_by,
452        }))
453        .await
454        .map(|r| r.get_app_templates)
455}
456
457/// Fetch all app templates by paginating through the responses.
458///
459/// Will fetch at most `max` templates.
460pub fn fetch_all_app_templates(
461    client: &WasmerClient,
462    page_size: i32,
463    sort_by: Option<types::AppTemplatesSortBy>,
464) -> impl futures::Stream<Item = Result<Vec<types::AppTemplate>, anyhow::Error>> + '_ {
465    let vars = GetAppTemplatesVars {
466        category_slug: String::new(),
467        first: page_size,
468        sort_by,
469        after: None,
470    };
471
472    futures::stream::try_unfold(
473        Some(vars),
474        move |vars: Option<types::GetAppTemplatesVars>| async move {
475            let vars = match vars {
476                Some(vars) => vars,
477                None => return Ok(None),
478            };
479
480            let con = client
481                .run_graphql_strict(types::GetAppTemplates::build(vars.clone()))
482                .await?
483                .get_app_templates
484                .context("backend did not return any data")?;
485
486            let items = con
487                .edges
488                .into_iter()
489                .flatten()
490                .filter_map(|edge| edge.node)
491                .collect::<Vec<_>>();
492
493            let next_cursor = con
494                .page_info
495                .end_cursor
496                .filter(|_| con.page_info.has_next_page);
497
498            let next_vars = next_cursor.map(|after| types::GetAppTemplatesVars {
499                after: Some(after),
500                ..vars
501            });
502
503            #[allow(clippy::type_complexity)]
504            let res: Result<
505                Option<(Vec<types::AppTemplate>, Option<types::GetAppTemplatesVars>)>,
506                anyhow::Error,
507            > = Ok(Some((items, next_vars)));
508
509            res
510        },
511    )
512}
513
514/// Fetch all app templates by paginating through the responses.
515///
516/// Will fetch at most `max` templates.
517pub fn fetch_all_app_templates_from_language(
518    client: &WasmerClient,
519    page_size: i32,
520    sort_by: Option<types::AppTemplatesSortBy>,
521    language: String,
522) -> impl futures::Stream<Item = Result<Vec<types::AppTemplate>, anyhow::Error>> + '_ {
523    let vars = GetAppTemplatesFromLanguageVars {
524        language_slug: language.clone().to_string(),
525        first: page_size,
526        sort_by,
527        after: None,
528    };
529
530    futures::stream::try_unfold(
531        Some(vars),
532        move |vars: Option<types::GetAppTemplatesFromLanguageVars>| async move {
533            let vars = match vars {
534                Some(vars) => vars,
535                None => return Ok(None),
536            };
537
538            let con = client
539                .run_graphql_strict(types::GetAppTemplatesFromLanguage::build(vars.clone()))
540                .await?
541                .get_app_templates
542                .context("backend did not return any data")?;
543
544            let items = con
545                .edges
546                .into_iter()
547                .flatten()
548                .filter_map(|edge| edge.node)
549                .collect::<Vec<_>>();
550
551            let next_cursor = con
552                .page_info
553                .end_cursor
554                .filter(|_| con.page_info.has_next_page);
555
556            let next_vars = next_cursor.map(|after| types::GetAppTemplatesFromLanguageVars {
557                after: Some(after),
558                ..vars
559            });
560
561            #[allow(clippy::type_complexity)]
562            let res: Result<
563                Option<(
564                    Vec<types::AppTemplate>,
565                    Option<types::GetAppTemplatesFromLanguageVars>,
566                )>,
567                anyhow::Error,
568            > = Ok(Some((items, next_vars)));
569
570            res
571        },
572    )
573}
574
575/// Fetch languages from available app templates.
576pub async fn fetch_app_template_languages(
577    client: &WasmerClient,
578    after: Option<String>,
579    first: Option<i32>,
580) -> Result<Option<types::TemplateLanguageConnection>, anyhow::Error> {
581    client
582        .run_graphql_strict(types::GetTemplateLanguages::build(
583            GetTemplateLanguagesVars { after, first },
584        ))
585        .await
586        .map(|r| r.get_template_languages)
587}
588
589/// Fetch all languages from available app templates by paginating through the responses.
590///
591/// Will fetch at most `max` templates.
592pub fn fetch_all_app_template_languages(
593    client: &WasmerClient,
594    page_size: Option<i32>,
595) -> impl futures::Stream<Item = Result<Vec<types::TemplateLanguage>, anyhow::Error>> + '_ {
596    let vars = GetTemplateLanguagesVars {
597        after: None,
598        first: page_size,
599    };
600
601    futures::stream::try_unfold(
602        Some(vars),
603        move |vars: Option<types::GetTemplateLanguagesVars>| async move {
604            let vars = match vars {
605                Some(vars) => vars,
606                None => return Ok(None),
607            };
608
609            let con = client
610                .run_graphql_strict(types::GetTemplateLanguages::build(vars.clone()))
611                .await?
612                .get_template_languages
613                .context("backend did not return any data")?;
614
615            let items = con
616                .edges
617                .into_iter()
618                .flatten()
619                .filter_map(|edge| edge.node)
620                .collect::<Vec<_>>();
621
622            let next_cursor = con
623                .page_info
624                .end_cursor
625                .filter(|_| con.page_info.has_next_page);
626
627            let next_vars = next_cursor.map(|after| types::GetTemplateLanguagesVars {
628                after: Some(after),
629                ..vars
630            });
631
632            #[allow(clippy::type_complexity)]
633            let res: Result<
634                Option<(
635                    Vec<types::TemplateLanguage>,
636                    Option<types::GetTemplateLanguagesVars>,
637                )>,
638                anyhow::Error,
639            > = Ok(Some((items, next_vars)));
640
641            res
642        },
643    )
644}
645
646/// Fetch all app templates by paginating through the responses.
647///
648/// Will fetch at most `max` templates.
649pub fn fetch_all_app_templates_from_framework(
650    client: &WasmerClient,
651    page_size: i32,
652    sort_by: Option<types::AppTemplatesSortBy>,
653    framework: String,
654) -> impl futures::Stream<Item = Result<Vec<types::AppTemplate>, anyhow::Error>> + '_ {
655    let vars = GetAppTemplatesFromFrameworkVars {
656        framework_slug: framework.clone().to_string(),
657        first: page_size,
658        sort_by,
659        after: None,
660    };
661
662    futures::stream::try_unfold(
663        Some(vars),
664        move |vars: Option<types::GetAppTemplatesFromFrameworkVars>| async move {
665            let vars = match vars {
666                Some(vars) => vars,
667                None => return Ok(None),
668            };
669
670            let con = client
671                .run_graphql_strict(types::GetAppTemplatesFromFramework::build(vars.clone()))
672                .await?
673                .get_app_templates
674                .context("backend did not return any data")?;
675
676            let items = con
677                .edges
678                .into_iter()
679                .flatten()
680                .filter_map(|edge| edge.node)
681                .collect::<Vec<_>>();
682
683            let next_cursor = con
684                .page_info
685                .end_cursor
686                .filter(|_| con.page_info.has_next_page);
687
688            let next_vars = next_cursor.map(|after| types::GetAppTemplatesFromFrameworkVars {
689                after: Some(after),
690                ..vars
691            });
692
693            #[allow(clippy::type_complexity)]
694            let res: Result<
695                Option<(
696                    Vec<types::AppTemplate>,
697                    Option<types::GetAppTemplatesFromFrameworkVars>,
698                )>,
699                anyhow::Error,
700            > = Ok(Some((items, next_vars)));
701
702            res
703        },
704    )
705}
706
707/// Fetch frameworks from available app templates.
708pub async fn fetch_app_template_frameworks(
709    client: &WasmerClient,
710    after: Option<String>,
711    first: Option<i32>,
712) -> Result<Option<types::TemplateFrameworkConnection>, anyhow::Error> {
713    client
714        .run_graphql_strict(types::GetTemplateFrameworks::build(
715            GetTemplateFrameworksVars { after, first },
716        ))
717        .await
718        .map(|r| r.get_template_frameworks)
719}
720
721/// Fetch all frameworks from available app templates by paginating through the responses.
722///
723/// Will fetch at most `max` templates.
724pub fn fetch_all_app_template_frameworks(
725    client: &WasmerClient,
726    page_size: Option<i32>,
727) -> impl futures::Stream<Item = Result<Vec<types::TemplateFramework>, anyhow::Error>> + '_ {
728    let vars = GetTemplateFrameworksVars {
729        after: None,
730        first: page_size,
731    };
732
733    futures::stream::try_unfold(
734        Some(vars),
735        move |vars: Option<types::GetTemplateFrameworksVars>| async move {
736            let vars = match vars {
737                Some(vars) => vars,
738                None => return Ok(None),
739            };
740
741            let con = client
742                .run_graphql_strict(types::GetTemplateFrameworks::build(vars.clone()))
743                .await?
744                .get_template_frameworks
745                .context("backend did not return any data")?;
746
747            let items = con
748                .edges
749                .into_iter()
750                .flatten()
751                .filter_map(|edge| edge.node)
752                .collect::<Vec<_>>();
753
754            let next_cursor = con
755                .page_info
756                .end_cursor
757                .filter(|_| con.page_info.has_next_page);
758
759            let next_vars = next_cursor.map(|after| types::GetTemplateFrameworksVars {
760                after: Some(after),
761                ..vars
762            });
763
764            #[allow(clippy::type_complexity)]
765            let res: Result<
766                Option<(
767                    Vec<types::TemplateFramework>,
768                    Option<types::GetTemplateFrameworksVars>,
769                )>,
770                anyhow::Error,
771            > = Ok(Some((items, next_vars)));
772
773            res
774        },
775    )
776}
777
778/// Get a signed URL to upload packages.
779pub async fn get_signed_url_for_package_upload(
780    client: &WasmerClient,
781    expires_after_seconds: Option<i32>,
782    filename: Option<&str>,
783    name: Option<&str>,
784    version: Option<&str>,
785) -> Result<Option<SignedUrl>, anyhow::Error> {
786    client
787        .run_graphql_strict(types::GetSignedUrlForPackageUpload::build(
788            GetSignedUrlForPackageUploadVariables {
789                expires_after_seconds,
790                filename,
791                name,
792                version,
793            },
794        ))
795        .await
796        .map(|r| r.get_signed_url_for_package_upload)
797}
798/// Push a package to the registry.
799pub async fn push_package_release(
800    client: &WasmerClient,
801    name: Option<&str>,
802    namespace: &str,
803    signed_url: &str,
804    private: Option<bool>,
805) -> Result<Option<PushPackageReleasePayload>, anyhow::Error> {
806    client
807        .run_graphql_strict(types::PushPackageRelease::build(
808            types::PushPackageReleaseVariables {
809                name,
810                namespace,
811                private,
812                signed_url,
813            },
814        ))
815        .await
816        .map(|r| r.push_package_release)
817}
818
819#[allow(clippy::too_many_arguments)]
820pub async fn tag_package_release(
821    client: &WasmerClient,
822    description: Option<&str>,
823    homepage: Option<&str>,
824    license: Option<&str>,
825    license_file: Option<&str>,
826    manifest: Option<&str>,
827    name: &str,
828    namespace: Option<&str>,
829    package_release_id: &cynic::Id,
830    private: Option<bool>,
831    readme: Option<&str>,
832    repository: Option<&str>,
833    version: &str,
834) -> Result<Option<TagPackageReleasePayload>, anyhow::Error> {
835    client
836        .run_graphql_strict(types::TagPackageRelease::build(
837            types::TagPackageReleaseVariables {
838                description,
839                homepage,
840                license,
841                license_file,
842                manifest,
843                name,
844                namespace,
845                package_release_id,
846                private,
847                readme,
848                repository,
849                version,
850            },
851        ))
852        .await
853        .map(|r| r.tag_package_release)
854}
855
856/// Get the currently logged in user.
857pub async fn current_user(client: &WasmerClient) -> Result<Option<types::User>, anyhow::Error> {
858    client
859        .run_graphql(types::GetCurrentUser::build(()))
860        .await
861        .map(|x| x.viewer)
862}
863
864/// Get the currently logged in user, together with all accessible namespaces.
865///
866/// You can optionally filter the namespaces by the user role.
867pub async fn current_user_with_namespaces(
868    client: &WasmerClient,
869    namespace_role: Option<types::GrapheneRole>,
870) -> Result<types::UserWithNamespaces, anyhow::Error> {
871    client
872        .run_graphql(types::GetCurrentUserWithNamespaces::build(
873            types::GetCurrentUserWithNamespacesVars { namespace_role },
874        ))
875        .await?
876        .viewer
877        .context("not logged in")
878}
879
880/// Retrieve an app.
881pub async fn get_app(
882    client: &WasmerClient,
883    owner: String,
884    name: String,
885) -> Result<Option<types::DeployApp>, anyhow::Error> {
886    client
887        .run_graphql(types::GetDeployApp::build(types::GetDeployAppVars {
888            name,
889            owner,
890        }))
891        .await
892        .map(|x| x.get_deploy_app)
893}
894
895/// Retrieve an app by its global alias.
896pub async fn get_app_by_alias(
897    client: &WasmerClient,
898    alias: String,
899) -> Result<Option<types::DeployApp>, anyhow::Error> {
900    client
901        .run_graphql(types::GetDeployAppByAlias::build(
902            types::GetDeployAppByAliasVars { alias },
903        ))
904        .await
905        .map(|x| x.get_app_by_global_alias)
906}
907
908/// Retrieve an app version.
909pub async fn get_app_version(
910    client: &WasmerClient,
911    owner: String,
912    name: String,
913    version: String,
914) -> Result<Option<types::DeployAppVersion>, anyhow::Error> {
915    client
916        .run_graphql(types::GetDeployAppVersion::build(
917            types::GetDeployAppVersionVars {
918                name,
919                owner,
920                version,
921            },
922        ))
923        .await
924        .map(|x| x.get_deploy_app_version)
925}
926
927/// Retrieve an app together with a specific version.
928pub async fn get_app_with_version(
929    client: &WasmerClient,
930    owner: String,
931    name: String,
932    version: String,
933) -> Result<GetDeployAppAndVersion, anyhow::Error> {
934    client
935        .run_graphql(types::GetDeployAppAndVersion::build(
936            types::GetDeployAppAndVersionVars {
937                name,
938                owner,
939                version,
940            },
941        ))
942        .await
943}
944
945/// Retrieve an app together with a specific version.
946pub async fn get_app_and_package_by_name(
947    client: &WasmerClient,
948    vars: types::GetPackageAndAppVars,
949) -> Result<(Option<types::Package>, Option<types::DeployApp>), anyhow::Error> {
950    let res = client
951        .run_graphql(types::GetPackageAndApp::build(vars))
952        .await?;
953    Ok((res.get_package, res.get_deploy_app))
954}
955
956/// Retrieve apps.
957pub async fn get_deploy_apps(
958    client: &WasmerClient,
959    vars: types::GetDeployAppsVars,
960) -> Result<DeployAppConnection, anyhow::Error> {
961    let res = client
962        .run_graphql(types::GetDeployApps::build(vars))
963        .await?;
964    res.get_deploy_apps.context("no apps returned")
965}
966
967/// Retrieve apps as a stream that will automatically paginate.
968pub fn get_deploy_apps_stream(
969    client: &WasmerClient,
970    vars: types::GetDeployAppsVars,
971) -> impl futures::Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + '_ {
972    futures::stream::try_unfold(
973        Some(vars),
974        move |vars: Option<types::GetDeployAppsVars>| async move {
975            let vars = match vars {
976                Some(vars) => vars,
977                None => return Ok(None),
978            };
979
980            let page = get_deploy_apps(client, vars.clone()).await?;
981
982            let end_cursor = page.page_info.end_cursor;
983
984            let items = page
985                .edges
986                .into_iter()
987                .filter_map(|x| x.and_then(|x| x.node))
988                .collect::<Vec<_>>();
989
990            let new_vars = end_cursor.map(|c| types::GetDeployAppsVars {
991                after: Some(c),
992                ..vars
993            });
994
995            Ok(Some((items, new_vars)))
996        },
997    )
998}
999
1000/// Retrieve versions for an app.
1001pub async fn get_deploy_app_versions(
1002    client: &WasmerClient,
1003    vars: GetDeployAppVersionsVars,
1004) -> Result<DeployAppVersionConnection, anyhow::Error> {
1005    let res = client
1006        .run_graphql_strict(types::GetDeployAppVersions::build(vars))
1007        .await?;
1008    let versions = res.get_deploy_app.context("app not found")?.versions;
1009    Ok(versions)
1010}
1011
1012/// Load all versions of an app.
1013///
1014/// Will paginate through all versions and return them in a single list.
1015pub async fn all_app_versions(
1016    client: &WasmerClient,
1017    owner: String,
1018    name: String,
1019) -> Result<Vec<DeployAppVersion>, anyhow::Error> {
1020    let mut vars = GetDeployAppVersionsVars {
1021        owner,
1022        name,
1023        offset: None,
1024        before: None,
1025        after: None,
1026        first: Some(10),
1027        last: None,
1028        sort_by: None,
1029    };
1030
1031    let mut all_versions = Vec::<DeployAppVersion>::new();
1032
1033    loop {
1034        let page = get_deploy_app_versions(client, vars.clone()).await?;
1035        if page.edges.is_empty() {
1036            break;
1037        }
1038
1039        for edge in page.edges {
1040            let edge = match edge {
1041                Some(edge) => edge,
1042                None => continue,
1043            };
1044            let version = match edge.node {
1045                Some(item) => item,
1046                None => continue,
1047            };
1048
1049            // Sanity check to avoid duplication.
1050            if all_versions.iter().any(|v| v.id == version.id) == false {
1051                all_versions.push(version);
1052            }
1053
1054            // Update pagination.
1055            vars.after = Some(edge.cursor);
1056        }
1057    }
1058
1059    Ok(all_versions)
1060}
1061
1062/// Retrieve versions for an app.
1063pub async fn get_deploy_app_versions_by_id(
1064    client: &WasmerClient,
1065    vars: types::GetDeployAppVersionsByIdVars,
1066) -> Result<DeployAppVersionConnection, anyhow::Error> {
1067    let res = client
1068        .run_graphql_strict(types::GetDeployAppVersionsById::build(vars))
1069        .await?;
1070    let versions = res
1071        .node
1072        .context("app not found")?
1073        .into_app()
1074        .context("invalid node type returned")?
1075        .versions;
1076    Ok(versions)
1077}
1078
1079/// Load all versions of an app id.
1080///
1081/// Will paginate through all versions and return them in a single list.
1082pub async fn all_app_versions_by_id(
1083    client: &WasmerClient,
1084    app_id: impl Into<String>,
1085) -> Result<Vec<DeployAppVersion>, anyhow::Error> {
1086    let mut vars = types::GetDeployAppVersionsByIdVars {
1087        id: cynic::Id::new(app_id),
1088        offset: None,
1089        before: None,
1090        after: None,
1091        first: Some(10),
1092        last: None,
1093        sort_by: None,
1094    };
1095
1096    let mut all_versions = Vec::<DeployAppVersion>::new();
1097
1098    loop {
1099        let page = get_deploy_app_versions_by_id(client, vars.clone()).await?;
1100        if page.edges.is_empty() {
1101            break;
1102        }
1103
1104        for edge in page.edges {
1105            let edge = match edge {
1106                Some(edge) => edge,
1107                None => continue,
1108            };
1109            let version = match edge.node {
1110                Some(item) => item,
1111                None => continue,
1112            };
1113
1114            // Sanity check to avoid duplication.
1115            if all_versions.iter().any(|v| v.id == version.id) == false {
1116                all_versions.push(version);
1117            }
1118
1119            // Update pagination.
1120            vars.after = Some(edge.cursor);
1121        }
1122    }
1123
1124    Ok(all_versions)
1125}
1126
1127/// Activate a particular version of an app.
1128pub async fn app_version_activate(
1129    client: &WasmerClient,
1130    version: String,
1131) -> Result<DeployApp, anyhow::Error> {
1132    let res = client
1133        .run_graphql_strict(types::MarkAppVersionAsActive::build(
1134            types::MarkAppVersionAsActiveVars {
1135                input: types::MarkAppVersionAsActiveInput {
1136                    app_version: version.into(),
1137                },
1138            },
1139        ))
1140        .await?;
1141    res.mark_app_version_as_active
1142        .context("app not found")
1143        .map(|x| x.app)
1144}
1145
1146/// Retrieve a node based on its global id.
1147pub async fn get_node(
1148    client: &WasmerClient,
1149    id: String,
1150) -> Result<Option<types::Node>, anyhow::Error> {
1151    client
1152        .run_graphql(types::GetNode::build(types::GetNodeVars { id: id.into() }))
1153        .await
1154        .map(|x| x.node)
1155}
1156
1157/// Retrieve an app by its global id.
1158pub async fn get_app_by_id(
1159    client: &WasmerClient,
1160    app_id: String,
1161) -> Result<DeployApp, anyhow::Error> {
1162    get_app_by_id_opt(client, app_id)
1163        .await?
1164        .context("app not found")
1165}
1166
1167/// Retrieve an app by its global id.
1168pub async fn get_app_by_id_opt(
1169    client: &WasmerClient,
1170    app_id: String,
1171) -> Result<Option<DeployApp>, anyhow::Error> {
1172    let app_opt = client
1173        .run_graphql(types::GetDeployAppById::build(
1174            types::GetDeployAppByIdVars {
1175                app_id: app_id.into(),
1176            },
1177        ))
1178        .await?
1179        .app;
1180
1181    if let Some(app) = app_opt {
1182        let app = app.into_deploy_app().context("app conversion failed")?;
1183        Ok(Some(app))
1184    } else {
1185        Ok(None)
1186    }
1187}
1188
1189/// Retrieve an app together with a specific version.
1190pub async fn get_app_with_version_by_id(
1191    client: &WasmerClient,
1192    app_id: String,
1193    version_id: String,
1194) -> Result<(DeployApp, DeployAppVersion), anyhow::Error> {
1195    let res = client
1196        .run_graphql(types::GetDeployAppAndVersionById::build(
1197            types::GetDeployAppAndVersionByIdVars {
1198                app_id: app_id.into(),
1199                version_id: version_id.into(),
1200            },
1201        ))
1202        .await?;
1203
1204    let app = res
1205        .app
1206        .context("app not found")?
1207        .into_deploy_app()
1208        .context("app conversion failed")?;
1209    let version = res
1210        .version
1211        .context("version not found")?
1212        .into_deploy_app_version()
1213        .context("version conversion failed")?;
1214
1215    Ok((app, version))
1216}
1217
1218/// Retrieve an app version by its global id.
1219pub async fn get_app_version_by_id(
1220    client: &WasmerClient,
1221    version_id: String,
1222) -> Result<DeployAppVersion, anyhow::Error> {
1223    client
1224        .run_graphql(types::GetDeployAppVersionById::build(
1225            types::GetDeployAppVersionByIdVars {
1226                version_id: version_id.into(),
1227            },
1228        ))
1229        .await?
1230        .version
1231        .context("app not found")?
1232        .into_deploy_app_version()
1233        .context("app version conversion failed")
1234}
1235
1236pub async fn get_app_version_by_id_with_app(
1237    client: &WasmerClient,
1238    version_id: String,
1239) -> Result<(DeployApp, DeployAppVersion), anyhow::Error> {
1240    let version = client
1241        .run_graphql(types::GetDeployAppVersionById::build(
1242            types::GetDeployAppVersionByIdVars {
1243                version_id: version_id.into(),
1244            },
1245        ))
1246        .await?
1247        .version
1248        .context("app not found")?
1249        .into_deploy_app_version()
1250        .context("app version conversion failed")?;
1251
1252    let app_id = version
1253        .app
1254        .as_ref()
1255        .context("could not load app for version")?
1256        .id
1257        .clone();
1258
1259    let app = get_app_by_id(client, app_id.into_inner()).await?;
1260
1261    Ok((app, version))
1262}
1263
1264/// List all apps that are accessible by the current user.
1265///
1266/// NOTE: this will only include the first pages and does not provide pagination.
1267pub async fn user_apps(
1268    client: &WasmerClient,
1269    sort: types::DeployAppsSortBy,
1270) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
1271    futures::stream::try_unfold(None, move |cursor| async move {
1272        let user = client
1273            .run_graphql(types::GetCurrentUserWithApps::build(
1274                GetCurrentUserWithAppsVars {
1275                    after: cursor,
1276                    sort: Some(sort),
1277                },
1278            ))
1279            .await?
1280            .viewer
1281            .context("not logged in")?;
1282
1283        let apps: Vec<_> = user
1284            .apps
1285            .edges
1286            .into_iter()
1287            .flatten()
1288            .filter_map(|x| x.node)
1289            .collect();
1290
1291        let cursor = user.apps.page_info.end_cursor;
1292
1293        if apps.is_empty() {
1294            Ok(None)
1295        } else {
1296            Ok(Some((apps, cursor)))
1297        }
1298    })
1299}
1300
1301/// List all apps that are accessible by the current user.
1302pub async fn user_accessible_apps(
1303    client: &WasmerClient,
1304    sort: types::DeployAppsSortBy,
1305) -> Result<
1306    impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_,
1307    anyhow::Error,
1308> {
1309    let user_apps = user_apps(client, sort).await;
1310
1311    // Get all aps in user-accessible namespaces.
1312    let namespace_res = client
1313        .run_graphql(types::GetCurrentUserWithNamespaces::build(
1314            types::GetCurrentUserWithNamespacesVars {
1315                namespace_role: None,
1316            },
1317        ))
1318        .await?;
1319    let active_user = namespace_res.viewer.context("not logged in")?;
1320    let namespace_names = active_user
1321        .namespaces
1322        .edges
1323        .iter()
1324        .filter_map(|edge| edge.as_ref())
1325        .filter_map(|edge| edge.node.as_ref())
1326        .map(|node| node.name.clone())
1327        .collect::<Vec<_>>();
1328
1329    let mut ns_apps = vec![];
1330    for ns in namespace_names {
1331        let apps = namespace_apps(client, ns, sort).await;
1332        ns_apps.push(apps);
1333    }
1334
1335    Ok((user_apps, ns_apps.merge()).merge())
1336}
1337
1338/// Get apps for a specific namespace.
1339///
1340/// NOTE: only retrieves the first page and does not do pagination.
1341pub async fn namespace_apps(
1342    client: &WasmerClient,
1343    namespace: String,
1344    sort: types::DeployAppsSortBy,
1345) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
1346    let namespace = namespace.clone();
1347
1348    futures::stream::try_unfold((None, namespace), move |(cursor, namespace)| async move {
1349        let res = client
1350            .run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
1351                name: namespace.to_string(),
1352                after: cursor,
1353                sort: Some(sort),
1354            }))
1355            .await?;
1356
1357        let ns = res
1358            .get_namespace
1359            .with_context(|| format!("failed to get namespace '{}'", namespace))?;
1360
1361        let apps: Vec<_> = ns
1362            .apps
1363            .edges
1364            .into_iter()
1365            .flatten()
1366            .filter_map(|x| x.node)
1367            .collect();
1368
1369        let cursor = ns.apps.page_info.end_cursor;
1370
1371        if apps.is_empty() {
1372            Ok(None)
1373        } else {
1374            Ok(Some((apps, (cursor, namespace))))
1375        }
1376    })
1377}
1378
1379/// Publish a new app (version).
1380pub async fn publish_deploy_app(
1381    client: &WasmerClient,
1382    vars: PublishDeployAppVars,
1383) -> Result<DeployAppVersion, anyhow::Error> {
1384    let res = client
1385        .run_graphql_raw(types::PublishDeployApp::build(vars))
1386        .await?;
1387
1388    if let Some(app) = res
1389        .data
1390        .and_then(|d| d.publish_deploy_app)
1391        .map(|d| d.deploy_app_version)
1392    {
1393        Ok(app)
1394    } else {
1395        Err(GraphQLApiFailure::from_errors(
1396            "could not publish app",
1397            res.errors,
1398        ))
1399    }
1400}
1401
1402/// Delete an app.
1403pub async fn delete_app(client: &WasmerClient, app_id: String) -> Result<(), anyhow::Error> {
1404    let res = client
1405        .run_graphql_strict(types::DeleteApp::build(types::DeleteAppVars {
1406            app_id: app_id.into(),
1407        }))
1408        .await?
1409        .delete_app
1410        .context("API did not return data for the delete_app mutation")?;
1411
1412    if !res.success {
1413        bail!("App deletion failed for an unknown reason");
1414    }
1415
1416    Ok(())
1417}
1418
1419/// Get all namespaces accessible by the current user.
1420pub async fn user_namespaces(
1421    client: &WasmerClient,
1422) -> Result<Vec<types::Namespace>, anyhow::Error> {
1423    let user = client
1424        .run_graphql(types::GetCurrentUserWithNamespaces::build(
1425            types::GetCurrentUserWithNamespacesVars {
1426                namespace_role: None,
1427            },
1428        ))
1429        .await?
1430        .viewer
1431        .context("not logged in")?;
1432
1433    let ns = user
1434        .namespaces
1435        .edges
1436        .into_iter()
1437        .flatten()
1438        // .filter_map(|x| x)
1439        .filter_map(|x| x.node)
1440        .collect();
1441
1442    Ok(ns)
1443}
1444
1445/// Retrieve a namespace by its name.
1446pub async fn get_namespace(
1447    client: &WasmerClient,
1448    name: String,
1449) -> Result<Option<types::Namespace>, anyhow::Error> {
1450    client
1451        .run_graphql(types::GetNamespace::build(types::GetNamespaceVars { name }))
1452        .await
1453        .map(|x| x.get_namespace)
1454}
1455
1456/// Create a new namespace.
1457pub async fn create_namespace(
1458    client: &WasmerClient,
1459    vars: CreateNamespaceVars,
1460) -> Result<types::Namespace, anyhow::Error> {
1461    client
1462        .run_graphql(types::CreateNamespace::build(vars))
1463        .await?
1464        .create_namespace
1465        .map(|x| x.namespace)
1466        .context("no namespace returned")
1467}
1468
1469/// Retrieve a package by its name.
1470pub async fn get_package(
1471    client: &WasmerClient,
1472    name: String,
1473) -> Result<Option<types::Package>, anyhow::Error> {
1474    client
1475        .run_graphql_strict(types::GetPackage::build(types::GetPackageVars { name }))
1476        .await
1477        .map(|x| x.get_package)
1478}
1479
1480/// Retrieve a package version by its name.
1481pub async fn get_package_version(
1482    client: &WasmerClient,
1483    name: String,
1484    version: String,
1485) -> Result<Option<types::PackageVersionWithPackage>, anyhow::Error> {
1486    client
1487        .run_graphql_strict(types::GetPackageVersion::build(
1488            types::GetPackageVersionVars { name, version },
1489        ))
1490        .await
1491        .map(|x| x.get_package_version)
1492}
1493
1494/// Retrieve package versions for an app.
1495pub async fn get_package_versions(
1496    client: &WasmerClient,
1497    vars: types::AllPackageVersionsVars,
1498) -> Result<PackageVersionConnection, anyhow::Error> {
1499    let res = client
1500        .run_graphql(types::GetAllPackageVersions::build(vars))
1501        .await?;
1502    Ok(res.all_package_versions)
1503}
1504
1505/// Retrieve a package release by hash.
1506pub async fn get_package_release(
1507    client: &WasmerClient,
1508    hash: &str,
1509) -> Result<Option<types::PackageWebc>, anyhow::Error> {
1510    let hash = hash.trim_start_matches("sha256:");
1511    client
1512        .run_graphql_strict(types::GetPackageRelease::build(
1513            types::GetPackageReleaseVars {
1514                hash: hash.to_string(),
1515            },
1516        ))
1517        .await
1518        .map(|x| x.get_package_release)
1519}
1520
1521pub async fn get_package_releases(
1522    client: &WasmerClient,
1523    vars: types::AllPackageReleasesVars,
1524) -> Result<types::PackageWebcConnection, anyhow::Error> {
1525    let res = client
1526        .run_graphql(types::GetAllPackageReleases::build(vars))
1527        .await?;
1528    Ok(res.all_package_releases)
1529}
1530
1531/// Retrieve all versions of a package as a stream that auto-paginates.
1532pub fn get_package_versions_stream(
1533    client: &WasmerClient,
1534    vars: types::AllPackageVersionsVars,
1535) -> impl futures::Stream<Item = Result<Vec<types::PackageVersionWithPackage>, anyhow::Error>> + '_
1536{
1537    futures::stream::try_unfold(
1538        Some(vars),
1539        move |vars: Option<types::AllPackageVersionsVars>| async move {
1540            let vars = match vars {
1541                Some(vars) => vars,
1542                None => return Ok(None),
1543            };
1544
1545            let page = get_package_versions(client, vars.clone()).await?;
1546
1547            let end_cursor = page.page_info.end_cursor;
1548
1549            let items = page
1550                .edges
1551                .into_iter()
1552                .filter_map(|x| x.and_then(|x| x.node))
1553                .collect::<Vec<_>>();
1554
1555            let new_vars = end_cursor.map(|cursor| types::AllPackageVersionsVars {
1556                after: Some(cursor),
1557                ..vars
1558            });
1559
1560            Ok(Some((items, new_vars)))
1561        },
1562    )
1563}
1564
1565/// Retrieve all package releases as a stream.
1566pub fn get_package_releases_stream(
1567    client: &WasmerClient,
1568    vars: types::AllPackageReleasesVars,
1569) -> impl futures::Stream<Item = Result<Vec<types::PackageWebc>, anyhow::Error>> + '_ {
1570    futures::stream::try_unfold(
1571        Some(vars),
1572        move |vars: Option<types::AllPackageReleasesVars>| async move {
1573            let vars = match vars {
1574                Some(vars) => vars,
1575                None => return Ok(None),
1576            };
1577
1578            let page = get_package_releases(client, vars.clone()).await?;
1579
1580            let end_cursor = page.page_info.end_cursor;
1581
1582            let items = page
1583                .edges
1584                .into_iter()
1585                .filter_map(|x| x.and_then(|x| x.node))
1586                .collect::<Vec<_>>();
1587
1588            let new_vars = end_cursor.map(|cursor| types::AllPackageReleasesVars {
1589                after: Some(cursor),
1590                ..vars
1591            });
1592
1593            Ok(Some((items, new_vars)))
1594        },
1595    )
1596}
1597
1598#[derive(Debug, PartialEq)]
1599pub enum TokenKind {
1600    SSH,
1601}
1602
1603pub async fn generate_deploy_config_token_raw(
1604    client: &WasmerClient,
1605    token_kind: TokenKind,
1606) -> Result<String, anyhow::Error> {
1607    let res = client
1608        .run_graphql(types::GenerateDeployConfigToken::build(
1609            types::GenerateDeployConfigTokenVars {
1610                input: match token_kind {
1611                    TokenKind::SSH => "{}".to_string(),
1612                },
1613            },
1614        ))
1615        .await?;
1616
1617    res.generate_deploy_config_token
1618        .map(|x| x.token)
1619        .context("no token returned")
1620}
1621
1622/// Get pages of logs associated with an application that lie within the
1623/// specified date range.
1624// NOTE: this is not public due to severe usability issues.
1625// The stream can loop forever due to re-fetching the same logs over and over.
1626#[tracing::instrument(skip_all, level = "debug")]
1627#[allow(clippy::let_with_type_underscore)]
1628#[allow(clippy::too_many_arguments)]
1629fn get_app_logs(
1630    client: &WasmerClient,
1631    name: String,
1632    owner: String,
1633    tag: Option<String>,
1634    start: OffsetDateTime,
1635    end: Option<OffsetDateTime>,
1636    watch: bool,
1637    streams: Option<Vec<LogStream>>,
1638    request_id: Option<String>,
1639    instance_ids: Option<Vec<String>>,
1640) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
1641    // Note: the backend will limit responses to a certain number of log
1642    // messages, so we use try_unfold() to keep calling it until we stop getting
1643    // new log messages.
1644    let span = tracing::Span::current();
1645
1646    futures::stream::try_unfold(start, move |start| {
1647        let variables = types::GetDeployAppLogsVars {
1648            name: name.clone(),
1649            owner: owner.clone(),
1650            version: tag.clone(),
1651            first: Some(100),
1652            starting_from: unix_timestamp(start),
1653            until: end.map(unix_timestamp),
1654            streams: streams.clone(),
1655            request_id: request_id.clone(),
1656            instance_ids: instance_ids.clone(),
1657        };
1658
1659        let fut = async move {
1660            loop {
1661                let deploy_app_version = client
1662                    .run_graphql(types::GetDeployAppLogs::build(variables.clone()))
1663                    .await?
1664                    .get_deploy_app_version
1665                    .context("app version not found")?;
1666
1667                let page: Vec<_> = deploy_app_version
1668                    .logs
1669                    .edges
1670                    .into_iter()
1671                    .flatten()
1672                    .filter_map(|edge| edge.node)
1673                    .collect();
1674
1675                if page.is_empty() {
1676                    if watch {
1677                        /*
1678                         * [TODO]: The resolution here should be configurable.
1679                         */
1680
1681                        #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
1682                        std::thread::sleep(Duration::from_secs(1));
1683
1684                        #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
1685                        tokio::time::sleep(Duration::from_secs(1)).await;
1686
1687                        continue;
1688                    }
1689
1690                    break Ok(None);
1691                } else {
1692                    let last_message = page.last().expect("The page is non-empty");
1693                    let timestamp = last_message.timestamp;
1694                    // NOTE: adding 1 microsecond to the timestamp to avoid fetching
1695                    // the last message again.
1696                    let timestamp = OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)
1697                        .with_context(|| {
1698                            format!("Unable to interpret {timestamp} as a unix timestamp")
1699                        })?;
1700
1701                    // FIXME: We need a better way to tell the backend "give me the
1702                    // next set of logs". Adding 1 nanosecond could theoretically
1703                    // mean we miss messages if multiple log messages arrived at
1704                    // the same nanosecond and the page ended midway.
1705
1706                    let next_timestamp = timestamp + Duration::from_nanos(1_000);
1707
1708                    break Ok(Some((page, next_timestamp)));
1709                }
1710            }
1711        };
1712
1713        fut.instrument(span.clone())
1714    })
1715}
1716
1717/// Get pages of logs associated with an application that lie within the
1718/// specified date range.
1719///
1720/// In contrast to [`get_app_logs`], this function collects the stream into a
1721/// final vector.
1722#[tracing::instrument(skip_all, level = "debug")]
1723#[allow(clippy::let_with_type_underscore)]
1724#[allow(clippy::too_many_arguments)]
1725pub async fn get_app_logs_paginated(
1726    client: &WasmerClient,
1727    name: String,
1728    owner: String,
1729    tag: Option<String>,
1730    start: OffsetDateTime,
1731    end: Option<OffsetDateTime>,
1732    watch: bool,
1733    streams: Option<Vec<LogStream>>,
1734) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
1735    let stream = get_app_logs(
1736        client, name, owner, tag, start, end, watch, streams, None, None,
1737    );
1738
1739    stream.map(|res| {
1740        let mut logs = Vec::new();
1741        let mut hasher = HashSet::new();
1742        let mut page = res?;
1743
1744        // Prevent duplicates.
1745        // TODO: don't clone the message, just hash it.
1746        page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
1747
1748        logs.extend(page);
1749
1750        Ok(logs)
1751    })
1752}
1753
1754/// Get pages of logs associated with an application that lie within the
1755/// specified date range with a specific instance identifier.
1756///
1757/// In contrast to [`get_app_logs`], this function collects the stream into a
1758/// final vector.
1759#[tracing::instrument(skip_all, level = "debug")]
1760#[allow(clippy::let_with_type_underscore)]
1761#[allow(clippy::too_many_arguments)]
1762pub async fn get_app_logs_paginated_filter_instance(
1763    client: &WasmerClient,
1764    name: String,
1765    owner: String,
1766    tag: Option<String>,
1767    start: OffsetDateTime,
1768    end: Option<OffsetDateTime>,
1769    watch: bool,
1770    streams: Option<Vec<LogStream>>,
1771    instance_ids: Vec<String>,
1772) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
1773    let stream = get_app_logs(
1774        client,
1775        name,
1776        owner,
1777        tag,
1778        start,
1779        end,
1780        watch,
1781        streams,
1782        None,
1783        Some(instance_ids),
1784    );
1785
1786    stream.map(|res| {
1787        let mut logs = Vec::new();
1788        let mut hasher = HashSet::new();
1789        let mut page = res?;
1790
1791        // Prevent duplicates.
1792        // TODO: don't clone the message, just hash it.
1793        page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
1794
1795        logs.extend(page);
1796
1797        Ok(logs)
1798    })
1799}
1800
1801/// Get pages of logs associated with an specific request for application that lie within the
1802/// specified date range.
1803///
1804/// In contrast to [`get_app_logs`], this function collects the stream into a
1805/// final vector.
1806#[tracing::instrument(skip_all, level = "debug")]
1807#[allow(clippy::let_with_type_underscore)]
1808#[allow(clippy::too_many_arguments)]
1809pub async fn get_app_logs_paginated_filter_request(
1810    client: &WasmerClient,
1811    name: String,
1812    owner: String,
1813    tag: Option<String>,
1814    start: OffsetDateTime,
1815    end: Option<OffsetDateTime>,
1816    watch: bool,
1817    streams: Option<Vec<LogStream>>,
1818    request_id: String,
1819) -> impl futures::Stream<Item = Result<Vec<Log>, anyhow::Error>> + '_ {
1820    let stream = get_app_logs(
1821        client,
1822        name,
1823        owner,
1824        tag,
1825        start,
1826        end,
1827        watch,
1828        streams,
1829        Some(request_id),
1830        None,
1831    );
1832
1833    stream.map(|res| {
1834        let mut logs = Vec::new();
1835        let mut hasher = HashSet::new();
1836        let mut page = res?;
1837
1838        // Prevent duplicates.
1839        // TODO: don't clone the message, just hash it.
1840        page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
1841
1842        logs.extend(page);
1843
1844        Ok(logs)
1845    })
1846}
1847
1848/// Retrieve a domain by its name.
1849///
1850/// Specify with_records to also retrieve all records for the domain.
1851pub async fn get_domain(
1852    client: &WasmerClient,
1853    domain: String,
1854) -> Result<Option<types::DnsDomain>, anyhow::Error> {
1855    let vars = types::GetDomainVars { domain };
1856
1857    let opt = client
1858        .run_graphql(types::GetDomain::build(vars))
1859        .await
1860        .map_err(anyhow::Error::from)?
1861        .get_domain;
1862    Ok(opt)
1863}
1864
1865/// Retrieve a domain by its name.
1866///
1867/// Specify with_records to also retrieve all records for the domain.
1868pub async fn get_domain_zone_file(
1869    client: &WasmerClient,
1870    domain: String,
1871) -> Result<Option<types::DnsDomainWithZoneFile>, anyhow::Error> {
1872    let vars = types::GetDomainVars { domain };
1873
1874    let opt = client
1875        .run_graphql(types::GetDomainWithZoneFile::build(vars))
1876        .await
1877        .map_err(anyhow::Error::from)?
1878        .get_domain;
1879    Ok(opt)
1880}
1881
1882/// Retrieve a domain by its name, along with all it's records.
1883pub async fn get_domain_with_records(
1884    client: &WasmerClient,
1885    domain: String,
1886) -> Result<Option<types::DnsDomainWithRecords>, anyhow::Error> {
1887    let vars = types::GetDomainVars { domain };
1888
1889    let opt = client
1890        .run_graphql(types::GetDomainWithRecords::build(vars))
1891        .await
1892        .map_err(anyhow::Error::from)?
1893        .get_domain;
1894    Ok(opt)
1895}
1896
1897/// Register a new domain
1898pub async fn register_domain(
1899    client: &WasmerClient,
1900    name: String,
1901    namespace: Option<String>,
1902    import_records: Option<bool>,
1903) -> Result<types::DnsDomain, anyhow::Error> {
1904    let vars = types::RegisterDomainVars {
1905        name,
1906        namespace,
1907        import_records,
1908    };
1909    let opt = client
1910        .run_graphql_strict(types::RegisterDomain::build(vars))
1911        .await
1912        .map_err(anyhow::Error::from)?
1913        .register_domain
1914        .context("Domain registration failed")?
1915        .domain
1916        .context("Domain registration failed, no associatede domain found.")?;
1917    Ok(opt)
1918}
1919
1920/// Retrieve all DNS records.
1921///
1922/// NOTE: this is a privileged operation that requires extra permissions.
1923pub async fn get_all_dns_records(
1924    client: &WasmerClient,
1925    vars: types::GetAllDnsRecordsVariables,
1926) -> Result<types::DnsRecordConnection, anyhow::Error> {
1927    client
1928        .run_graphql_strict(types::GetAllDnsRecords::build(vars))
1929        .await
1930        .map_err(anyhow::Error::from)
1931        .map(|x| x.get_all_dnsrecords)
1932}
1933
1934/// Retrieve all DNS domains.
1935pub async fn get_all_domains(
1936    client: &WasmerClient,
1937    vars: types::GetAllDomainsVariables,
1938) -> Result<Vec<DnsDomain>, anyhow::Error> {
1939    let connection = client
1940        .run_graphql_strict(types::GetAllDomains::build(vars))
1941        .await
1942        .map_err(anyhow::Error::from)
1943        .map(|x| x.get_all_domains)
1944        .context("no domains returned")?;
1945    Ok(connection
1946        .edges
1947        .into_iter()
1948        .flatten()
1949        .filter_map(|x| x.node)
1950        .collect())
1951}
1952
1953/// Retrieve a domain by its name.
1954///
1955/// Specify with_records to also retrieve all records for the domain.
1956pub fn get_all_dns_records_stream(
1957    client: &WasmerClient,
1958    vars: types::GetAllDnsRecordsVariables,
1959) -> impl futures::Stream<Item = Result<Vec<types::DnsRecord>, anyhow::Error>> + '_ {
1960    futures::stream::try_unfold(
1961        Some(vars),
1962        move |vars: Option<types::GetAllDnsRecordsVariables>| async move {
1963            let vars = match vars {
1964                Some(vars) => vars,
1965                None => return Ok(None),
1966            };
1967
1968            let page = get_all_dns_records(client, vars.clone()).await?;
1969
1970            let end_cursor = page.page_info.end_cursor;
1971
1972            let items = page
1973                .edges
1974                .into_iter()
1975                .filter_map(|x| x.and_then(|x| x.node))
1976                .collect::<Vec<_>>();
1977
1978            let new_vars = end_cursor.map(|c| types::GetAllDnsRecordsVariables {
1979                after: Some(c),
1980                ..vars
1981            });
1982
1983            Ok(Some((items, new_vars)))
1984        },
1985    )
1986}
1987
1988pub async fn purge_cache_for_app_version(
1989    client: &WasmerClient,
1990    vars: types::PurgeCacheForAppVersionVars,
1991) -> Result<(), anyhow::Error> {
1992    client
1993        .run_graphql_strict(types::PurgeCacheForAppVersion::build(vars))
1994        .await
1995        .map_err(anyhow::Error::from)
1996        .map(|x| x.purge_cache_for_app_version)
1997        .context("backend did not return data")?;
1998
1999    Ok(())
2000}
2001
2002/// Convert a [`OffsetDateTime`] to a unix timestamp that the WAPM backend
2003/// understands.
2004fn unix_timestamp(ts: OffsetDateTime) -> f64 {
2005    let nanos_per_second = 1_000_000_000;
2006    let timestamp = ts.unix_timestamp_nanos();
2007    let nanos = timestamp % nanos_per_second;
2008    let secs = timestamp / nanos_per_second;
2009
2010    (secs as f64) + (nanos as f64 / nanos_per_second as f64)
2011}
2012
2013/// Publish a new app (version).
2014pub async fn upsert_domain_from_zone_file(
2015    client: &WasmerClient,
2016    zone_file_contents: String,
2017    delete_missing_records: bool,
2018) -> Result<DnsDomain, anyhow::Error> {
2019    let vars = UpsertDomainFromZoneFileVars {
2020        zone_file: zone_file_contents,
2021        delete_missing_records: Some(delete_missing_records),
2022    };
2023    let res = client
2024        .run_graphql_strict(types::UpsertDomainFromZoneFile::build(vars))
2025        .await?;
2026
2027    let domain = res
2028        .upsert_domain_from_zone_file
2029        .context("Upserting domain from zonefile failed")?
2030        .domain;
2031
2032    Ok(domain)
2033}