warg_client/
lib.rs

1//! A client library for Warg component registries.
2
3#![deny(missing_docs)]
4use crate::storage::PackageInfo;
5
6use anyhow::{anyhow, Context, Result};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt, TryStreamExt};
9use indexmap::{IndexMap, IndexSet};
10use reqwest::{Body, IntoUrl};
11use secrecy::Secret;
12use semver::{Version, VersionReq};
13use std::cmp::Ordering;
14use std::fs;
15use std::str::FromStr;
16use std::{borrow::Cow, path::PathBuf, time::Duration};
17use storage::{
18    ContentStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage,
19    FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
20};
21use thiserror::Error;
22use tokio_util::io::ReaderStream;
23use warg_api::v1::{
24    fetch::{FetchError, FetchLogsRequest},
25    package::{
26        MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27        UploadEndpoint,
28    },
29    proof::{ConsistencyRequest, InclusionRequest},
30};
31use warg_crypto::hash::Sha256;
32use warg_crypto::{hash::AnyHash, signing, Encode, Signable};
33use warg_protocol::package::ReleaseState;
34use warg_protocol::{
35    operator, package,
36    registry::{LogId, LogLeaf, PackageName, RecordId, RegistryLen, TimestampedCheckpoint},
37    PublishedProtoEnvelope,
38};
39use wasm_compose::graph::{CompositionGraph, EncodeOptions, ExportIndex, InstanceId};
40
41#[cfg(feature = "keyring")]
42pub mod keyring;
43
44pub mod api;
45mod config;
46/// Tools for locking and bundling components
47pub mod depsolve;
48use depsolve::{Bundler, LockListBuilder};
49/// Tools for semver
50pub mod version_util;
51use version_util::{kindless_name, locked_package, versioned_package, Import, ImportKind};
52pub mod lock;
53mod registry_url;
54pub mod storage;
55pub use self::config::*;
56pub use self::registry_url::RegistryUrl;
57
58const DEFAULT_WAIT_INTERVAL: Duration = Duration::from_secs(1);
59
60/// For Bytecode Alliance projects, the default registry is set to `bytecodealliance.org`.
61/// The `.well-known` config path may resolve to another domain where the registry is hosted.
62pub const DEFAULT_REGISTRY: &str = "bytecodealliance.org";
63
64/// A client for a Warg registry.
65pub struct Client<R, C, N>
66where
67    R: RegistryStorage,
68    C: ContentStorage,
69    N: NamespaceMapStorage,
70{
71    registry: R,
72    content: C,
73    namespace_map: N,
74    api: api::Client,
75    ignore_federation_hints: bool,
76    disable_auto_accept_federation_hints: bool,
77    disable_auto_package_init: bool,
78    disable_interactive: bool,
79    keyring_backend: Option<String>,
80    keys: IndexSet<String>,
81}
82
83impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C, N> {
84    /// Creates a new client for the given URL, registry storage, and
85    /// content storage.
86    #[allow(clippy::too_many_arguments)]
87    pub fn new(
88        url: impl IntoUrl,
89        registry: R,
90        content: C,
91        namespace_map: N,
92        auth_token: Option<Secret<String>>,
93        ignore_federation_hints: bool,
94        disable_auto_accept_federation_hints: bool,
95        disable_auto_package_init: bool,
96        disable_interactive: bool,
97        keyring_backend: Option<String>,
98        keys: IndexSet<String>,
99    ) -> ClientResult<Self> {
100        let api = api::Client::new(url, auth_token)?;
101        Ok(Self {
102            registry,
103            content,
104            namespace_map,
105            api,
106            ignore_federation_hints,
107            disable_auto_accept_federation_hints,
108            disable_auto_package_init,
109            disable_interactive,
110            keyring_backend,
111            keys,
112        })
113    }
114
115    /// Gets the URL of the client.
116    pub fn url(&self) -> &RegistryUrl {
117        self.api.url()
118    }
119
120    /// Gets the registry storage used by the client.
121    pub fn registry(&self) -> &R {
122        &self.registry
123    }
124
125    /// Gets the content storage used by the client.
126    pub fn content(&self) -> &C {
127        &self.content
128    }
129
130    /// Gets the namespace map
131    pub fn namespace_map(&self) -> &N {
132        &self.namespace_map
133    }
134
135    /// Get warg registry domain.
136    pub async fn get_warg_registry(
137        &self,
138        namespace: &str,
139    ) -> Result<Option<RegistryDomain>, ClientError> {
140        let operator = self
141            .registry()
142            .load_operator(Some(&RegistryDomain::from_str(namespace)?))
143            .await?;
144        if let Some(op) = operator {
145            match op.state.namespace_state(namespace) {
146                Some(warg_protocol::operator::NamespaceState::Imported { registry }) => {
147                    return Ok(Some(RegistryDomain::from_str(registry)?));
148                }
149                Some(warg_protocol::operator::NamespaceState::Defined) => {
150                    return Ok(None);
151                }
152                _ => (),
153            }
154        };
155        let nm_map = self.namespace_map.load_namespace_map().await?;
156        Ok(nm_map.and_then(|nm_map| {
157            nm_map
158                .get(namespace)
159                .map(|domain| RegistryDomain::from_str(domain).unwrap())
160        }))
161    }
162
163    /// Stores namespace mapping in local storage
164    pub async fn store_namespace(
165        &self,
166        namespace: String,
167        registry_domain: RegistryDomain,
168    ) -> Result<()> {
169        self.namespace_map
170            .store_namespace(namespace, registry_domain)
171            .await?;
172        Ok(())
173    }
174
175    /// Resets the namespace map
176    pub async fn reset_namespaces(&self) -> Result<()> {
177        self.namespace_map.reset_namespaces().await?;
178        Ok(())
179    }
180
181    /// Reset client storage for the registry.
182    pub async fn reset_registry(&self) -> ClientResult<()> {
183        tracing::info!("resetting registry local state");
184        self.registry
185            .reset(true)
186            .await
187            .or(Err(ClientError::ResettingRegistryLocalStateFailed))
188    }
189
190    /// Clear client content cache.
191    pub async fn clear_content_cache(&self) -> ClientResult<()> {
192        tracing::info!("removing content cache");
193        self.content
194            .clear()
195            .await
196            .or(Err(ClientError::ClearContentCacheFailed))
197    }
198
199    /// Locks component
200    pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
201        let mut builder = LockListBuilder::default();
202        builder.build_list(self, info).await?;
203        let top = Import {
204            name: format!("{}:{}", info.name.namespace(), info.name.name()),
205            req: VersionReq::STAR,
206            kind: ImportKind::Unlocked,
207        };
208        builder.lock_list.insert(top);
209        let mut composer = CompositionGraph::new();
210        let mut handled = IndexMap::<String, InstanceId>::new();
211        for package in builder.lock_list {
212            let name = package.name.clone();
213            let version = package.req;
214            let id = PackageName::new(name)?;
215            let info = self
216                .registry()
217                .load_package(self.get_warg_registry(id.namespace()).await?.as_ref(), &id)
218                .await?;
219            if let Some(inf) = info {
220                let release = if version != VersionReq::STAR {
221                    inf.state
222                        .releases()
223                        .filter(|r| version.matches(&r.version))
224                        .last()
225                } else {
226                    inf.state.releases().last()
227                };
228
229                if let Some(r) = release {
230                    let state = &r.state;
231                    if let ReleaseState::Released { content } = state {
232                        let locked_package = locked_package(&package.name, r, content);
233                        let path = self.content().content_location(content);
234                        if let Some(p) = path {
235                            let bytes = fs::read(&p).map_err(|_| ClientError::ContentNotFound {
236                                digest: content.clone(),
237                            })?;
238
239                            let read_digest =
240                                AnyHash::from_str(&format!("sha256:{}", sha256::digest(bytes)))
241                                    .unwrap();
242                            if content != &read_digest {
243                                return Err(ClientError::IncorrectContent {
244                                    digest: read_digest,
245                                    expected: content.clone(),
246                                });
247                            }
248                            let component =
249                                wasm_compose::graph::Component::from_file(&locked_package, p)?;
250                            let component_id = if let Some((id, _)) =
251                                composer.get_component_by_name(&locked_package)
252                            {
253                                id
254                            } else {
255                                composer.add_component(component)?
256                            };
257                            let instance_id = composer.instantiate(component_id)?;
258                            let added = composer.get_component(component_id);
259                            handled.insert(versioned_package(&package.name, version), instance_id);
260                            let mut args = Vec::new();
261                            if let Some(added) = added {
262                                for (index, name, _) in added.imports() {
263                                    let iid = handled.get(kindless_name(name));
264                                    if let Some(arg) = iid {
265                                        args.push((arg, index));
266                                    }
267                                }
268                            }
269                            for arg in args {
270                                composer.connect(
271                                    *arg.0,
272                                    None::<ExportIndex>,
273                                    instance_id,
274                                    arg.1,
275                                )?;
276                            }
277                        }
278                    }
279                }
280            }
281        }
282        let final_name = &format!("{}:{}", info.name.namespace(), &info.name.name());
283        let id = handled.get(final_name);
284        let options = EncodeOptions {
285            export: id.copied(),
286            ..Default::default()
287        };
288        let locked = composer.encode(options)?;
289        fs::write("./locked.wasm", locked.as_slice()).map_err(|e| ClientError::Other(e.into()))?;
290        Ok(locked)
291    }
292
293    /// Bundles component
294    pub async fn bundle_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
295        let mut bundler = Bundler::new(self);
296        let path = PathBuf::from("./locked.wasm");
297        let locked = if !path.is_file() {
298            self.lock_component(info).await?
299        } else {
300            fs::read("./locked.wasm").map_err(|e| ClientError::Other(e.into()))?
301        };
302        let bundled = bundler.parse(&locked).await?;
303        fs::write("./bundled.wasm", bundled.as_slice())
304            .map_err(|e| ClientError::Other(e.into()))?;
305        Ok(bundled.as_slice().to_vec())
306    }
307
308    /// Submits the publish information in client storage.
309    ///
310    /// If there's no publishing information in client storage, an error is returned.
311    ///
312    /// Returns the identifier of the record that was published.
313    ///
314    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
315    pub async fn publish(&self, signing_key: &signing::PrivateKey) -> ClientResult<RecordId> {
316        let info = self
317            .registry
318            .load_publish()
319            .await?
320            .ok_or(ClientError::NotPublishing)?;
321
322        let res = self.publish_with_info(signing_key, info).await;
323        self.registry.store_publish(None).await?;
324        res
325    }
326
327    /// Submits the provided publish information or, if not provided, loads from client
328    /// storage. Uses the keyring to retrieve a key and sign.
329    ///
330    /// If there's no publishing information in client storage, an error is returned.
331    ///
332    /// Returns the identifier of the record that was published.
333    ///
334    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
335    #[cfg(feature = "keyring")]
336    pub async fn sign_with_keyring_and_publish(
337        &self,
338        publish_info: Option<PublishInfo>,
339    ) -> ClientResult<RecordId> {
340        let publish_info = if let Some(publish_info) = publish_info {
341            publish_info
342        } else {
343            self.registry
344                .load_publish()
345                .await?
346                .ok_or(ClientError::NotPublishing)?
347        };
348
349        let registry_domain = self
350            .get_warg_registry(publish_info.name.namespace())
351            .await?;
352        let signing_key = keyring::Keyring::new(
353            self.keyring_backend
354                .as_deref()
355                .unwrap_or(keyring::Keyring::DEFAULT_BACKEND),
356        )?
357        .get_signing_key(
358            registry_domain.map(|domain| domain.to_string()).as_deref(),
359            &self.keys,
360            Some(&self.url().to_string()),
361        )?;
362
363        let res = self.publish_with_info(&signing_key, publish_info).await;
364        self.registry.store_publish(None).await?;
365        res
366    }
367
368    /// Submits the provided publish information.
369    ///
370    /// Any publish information in client storage is ignored.
371    ///
372    /// Returns the identifier of the record that was published.
373    ///
374    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
375    pub async fn publish_with_info(
376        &self,
377        signing_key: &signing::PrivateKey,
378        publish_info: PublishInfo,
379    ) -> ClientResult<RecordId> {
380        if publish_info.entries.is_empty() {
381            return Err(ClientError::NothingToPublish {
382                name: publish_info.name.clone(),
383            });
384        }
385
386        tracing::info!(
387            "publishing {new}package `{name}`",
388            name = publish_info.name,
389            new = if publish_info.initializing() {
390                "new "
391            } else {
392                ""
393            }
394        );
395        tracing::debug!("entries: {:?}", publish_info.entries);
396
397        let mut accepted_prompt_to_initialize = false;
398
399        let mut init_record_id: Option<RecordId> = None;
400
401        let (package, record) = loop {
402            let mut info = publish_info.clone();
403
404            let mut initializing = info.initializing();
405
406            let package = match self.fetch_package(&info.name).await {
407                Ok(package) => {
408                    if initializing {
409                        return Err(ClientError::CannotInitializePackage {
410                            name: package.name,
411                            init_record_id,
412                        });
413                    } else if info.head.is_none() {
414                        // If we're not initializing the package and a head was not explicitly specified,
415                        // set to the latest known head.
416                        info.head = package.state.head().as_ref().map(|h| h.digest.clone());
417                    }
418                    package
419                }
420                Err(ClientError::PackageDoesNotExist {
421                    name,
422                    has_auth_token,
423                }) => {
424                    if !initializing {
425                        if !self.disable_auto_package_init {
426                            info.entries.insert(0, crate::storage::PublishEntry::Init);
427                            initializing = true;
428                            accepted_prompt_to_initialize = true;
429                        } else {
430                            if self.disable_interactive || cfg!(not(feature = "cli-interactive")) {
431                                return Err(ClientError::MustInitializePackage {
432                                    name,
433                                    has_auth_token,
434                                });
435                            }
436
437                            #[cfg(feature = "cli-interactive")]
438                            {
439                                use crate::storage::PublishEntry;
440                                use dialoguer::{theme::ColorfulTheme, Confirm};
441
442                                if accepted_prompt_to_initialize
443                                    || Confirm::with_theme(&ColorfulTheme::default())
444                                        .with_prompt(format!(
445                                            "Package `{package_name}` was not found.
446    If it exists, you may not have access.
447    Attempt to create `{package_name}` and publish the release y/N\n",
448                                            package_name = &info.name,
449                                        ))
450                                        .default(false)
451                                        .interact()
452                                        .unwrap()
453                                {
454                                    info.entries.insert(0, PublishEntry::Init);
455                                    initializing = true;
456                                    accepted_prompt_to_initialize = true;
457                                } else {
458                                    return Err(ClientError::MustInitializePackage {
459                                        name,
460                                        has_auth_token,
461                                    });
462                                }
463                            }
464                        }
465                    }
466                    PackageInfo::new(info.name.clone())
467                }
468                err => err?,
469            };
470            let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
471
472            let log_id = LogId::package_log::<Sha256>(&package.name);
473            let record = info.finalize(signing_key)?;
474            let record_id = RecordId::package_record::<Sha256>(&record);
475            let record = match self
476                .api
477                .publish_package_record(
478                    registry_domain.as_ref(),
479                    &log_id,
480                    PublishRecordRequest {
481                        package_name: Cow::Borrowed(&package.name),
482                        record: Cow::Owned(record.into()),
483                        content_sources: Default::default(),
484                    },
485                )
486                .await
487            {
488                Ok(record) => Ok(record),
489                Err(api::ClientError::Package(PackageError::Rejection(reason))) => {
490                    Err(ClientError::PublishRejected {
491                        name: package.name.clone(),
492                        reason,
493                        record_id,
494                    })
495                }
496                Err(api::ClientError::Package(PackageError::Unauthorized(reason))) => {
497                    Err(ClientError::Unauthorized(reason))
498                }
499                Err(api::ClientError::Package(PackageError::ConflictPendingPublish(
500                    pending_record_id,
501                ))) => {
502                    // conflicting pending publish succeeds,
503                    tracing::info!("waiting for conflicting publish to complete");
504                    // check registry for federated namespace mapping, if initializing
505                    if initializing {
506                        match self.fetch_package(&package.name).await {
507                            Ok(_) => {}
508                            // may not exist until conflicting publish completes
509                            Err(ClientError::PackageDoesNotExist { .. }) => {}
510                            Err(err) => return Err(err),
511                        }
512                        init_record_id = Some(pending_record_id.clone());
513                    }
514                    self.wait_for_publish(&package.name, &pending_record_id, DEFAULT_WAIT_INTERVAL)
515                        .await
516                        .map_err(|err| match err {
517                            ClientError::PackageMissingContent => {
518                                ClientError::ConflictPendingPublish {
519                                    name: package.name.clone(),
520                                    record_id,
521                                    pending_record_id,
522                                }
523                            }
524                            err => err,
525                        })?;
526
527                    continue;
528                }
529                Err(e) => Err(ClientError::translate_log_not_found(
530                    e,
531                    self.api.auth_token().is_some(),
532                    |id| {
533                        if id == &log_id {
534                            Some(package.name.clone())
535                        } else {
536                            None
537                        }
538                    },
539                )),
540            }?;
541
542            break (package, record);
543        };
544
545        // TODO: parallelize this
546        for (digest, MissingContent { upload }) in record.missing_content() {
547            // Upload the missing content, if the registry supports it
548            let Some(UploadEndpoint::Http {
549                method,
550                url,
551                headers,
552            }) = upload.first()
553            else {
554                continue;
555            };
556
557            self.api
558                .upload_content(
559                    method,
560                    url,
561                    headers,
562                    Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
563                        || ClientError::ContentNotFound {
564                            digest: digest.clone(),
565                        },
566                    )?),
567                )
568                .await
569                .map_err(|e| match e {
570                    api::ClientError::Package(PackageError::Rejection(reason)) => {
571                        ClientError::PublishRejected {
572                            name: package.name.clone(),
573                            record_id: record.record_id.clone(),
574                            reason,
575                        }
576                    }
577                    api::ClientError::Package(PackageError::Unauthorized(reason)) => {
578                        ClientError::Unauthorized(reason)
579                    }
580                    _ => e.into(),
581                })?;
582        }
583
584        Ok(record.record_id)
585    }
586
587    /// Waits for a package record to transition to the `published` state.
588    ///
589    /// The `interval` is the amount of time to wait between checks.
590    ///
591    /// Returns an error if the package record was rejected.
592    pub async fn wait_for_publish(
593        &self,
594        package: &PackageName,
595        record_id: &RecordId,
596        interval: Duration,
597    ) -> ClientResult<()> {
598        let registry_domain = self.get_warg_registry(package.namespace()).await?;
599        let log_id = LogId::package_log::<Sha256>(package);
600        let mut current = self
601            .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
602            .await?;
603
604        loop {
605            match current.state {
606                PackageRecordState::Sourcing { .. } => {
607                    return Err(ClientError::PackageMissingContent);
608                }
609                PackageRecordState::Published { .. } => {
610                    self.fetch_package(package).await?;
611                    return Ok(());
612                }
613                PackageRecordState::Rejected { reason } => {
614                    return Err(ClientError::PublishRejected {
615                        name: package.clone(),
616                        record_id: record_id.clone(),
617                        reason,
618                    });
619                }
620                PackageRecordState::Processing => {
621                    tokio::time::sleep(interval).await;
622                    current = self
623                        .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
624                        .await?;
625                }
626            }
627        }
628    }
629
630    /// Updates all package logs in client registry storage to the latest registry checkpoint.
631    pub async fn update(&self) -> ClientResult<()> {
632        tracing::info!("updating downloaded package logs");
633
634        for mut packages in self.registry.load_all_packages().await?.into_values() {
635            self.update_checkpoints(&mut packages).await?;
636        }
637
638        Ok(())
639    }
640
641    /// Downloads the latest version of a package into client storage that
642    /// satisfies the given version requirement.
643    ///
644    /// If the requested package log is not present in client storage, it
645    /// will be fetched from the registry first.
646    ///
647    /// An error is returned if the package does not exist.
648    ///
649    /// If a version satisfying the requirement does not exist, `None` is
650    /// returned.
651    ///
652    /// Returns the path within client storage of the package contents for
653    /// the resolved version.
654    pub async fn download(
655        &self,
656        package: &PackageName,
657        requirement: &VersionReq,
658    ) -> Result<Option<PackageDownload>, ClientError> {
659        let info = self.package(package).await?;
660
661        let registry_domain = self.get_warg_registry(package.namespace()).await?;
662
663        tracing::debug!(
664            package = package.as_ref(),
665            version_requirement = requirement.to_string(),
666            registry_header = ?registry_domain,
667            "downloading",
668        );
669
670        match info.state.find_latest_release(requirement) {
671            Some(release) => {
672                let digest = release
673                    .content()
674                    .context("invalid state: not yanked but missing content")?
675                    .clone();
676                let path = self
677                    .download_content(registry_domain.as_ref(), &digest)
678                    .await?;
679                Ok(Some(PackageDownload {
680                    version: release.version.clone(),
681                    digest,
682                    path,
683                }))
684            }
685            None => Ok(None),
686        }
687    }
688
689    /// Downloads the latest version of a package.
690    ///
691    /// If the requested package log is not present in client storage, it
692    /// will be fetched from the registry first.
693    ///
694    /// An error is returned if the package does not exist.
695    ///
696    /// If a version satisfying the requirement does not exist, `None` is
697    /// returned.
698    pub async fn download_as_stream(
699        &self,
700        package: &PackageName,
701        requirement: &VersionReq,
702    ) -> Result<Option<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>)>, ClientError> {
703        let info = self.package(package).await?;
704
705        let registry_domain = self.get_warg_registry(package.namespace()).await?;
706
707        tracing::debug!(
708            package = package.as_ref(),
709            version_requirement = requirement.to_string(),
710            registry_header = ?registry_domain,
711            "downloading",
712        );
713
714        match info.state.find_latest_release(requirement) {
715            Some(release) => {
716                let digest = release
717                    .content()
718                    .context("invalid state: not yanked but missing content")?
719                    .clone();
720                let stream = self
721                    .download_content_stream(registry_domain.as_ref(), &digest)
722                    .await?;
723                Ok(Some((
724                    PackageDownloadInfo {
725                        version: release.version.clone(),
726                        digest,
727                    },
728                    stream,
729                )))
730            }
731            None => Ok(None),
732        }
733    }
734
735    /// Downloads the specified version of a package into client storage.
736    ///
737    /// If the requested package log is not present in client storage, it
738    /// will be fetched from the registry first.
739    ///
740    /// An error is returned if the package or version does not exist.
741    ///
742    /// Returns the path within client storage of the package contents for
743    /// the specified version.
744    pub async fn download_exact(
745        &self,
746        package: &PackageName,
747        version: &Version,
748    ) -> Result<PackageDownload, ClientError> {
749        let info = self.package(package).await?;
750
751        let registry_domain = self.get_warg_registry(package.namespace()).await?;
752
753        tracing::debug!(
754            package = package.as_ref(),
755            version = version.to_string(),
756            registry_header = ?registry_domain,
757            "downloading exact version",
758        );
759
760        let release =
761            info.state
762                .release(version)
763                .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
764                    version: version.clone(),
765                    name: package.clone(),
766                })?;
767
768        let digest = release
769            .content()
770            .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
771                version: version.clone(),
772                name: package.clone(),
773            })?;
774
775        Ok(PackageDownload {
776            version: version.clone(),
777            digest: digest.clone(),
778            path: self
779                .download_content(registry_domain.as_ref(), digest)
780                .await?,
781        })
782    }
783
784    /// Downloads the specified version of a package.
785    ///
786    /// If the requested package log is not present in client storage, it
787    /// will be fetched from the registry first.
788    ///
789    /// An error is returned if the package or version does not exist.
790    pub async fn download_exact_as_stream(
791        &self,
792        package: &PackageName,
793        version: &Version,
794    ) -> Result<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>), ClientError> {
795        let info = self.package(package).await?;
796
797        let registry_domain = self.get_warg_registry(package.namespace()).await?;
798
799        tracing::debug!(
800            package = package.as_ref(),
801            version = version.to_string(),
802            registry_header = ?registry_domain,
803            "downloading exact version",
804        );
805
806        let release =
807            info.state
808                .release(version)
809                .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
810                    version: version.clone(),
811                    name: package.clone(),
812                })?;
813
814        let digest = release
815            .content()
816            .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
817                version: version.clone(),
818                name: package.clone(),
819            })?;
820
821        Ok((
822            PackageDownloadInfo {
823                version: version.clone(),
824                digest: digest.clone(),
825            },
826            self.download_content_stream(registry_domain.as_ref(), digest)
827                .await?,
828        ))
829    }
830
831    async fn update_packages_and_return_federated_packages<'a>(
832        &self,
833        registry_domain: Option<&RegistryDomain>,
834        packages: impl IntoIterator<Item = &'a mut PackageInfo>,
835    ) -> Result<IndexMap<Option<RegistryDomain>, Vec<&'a mut PackageInfo>>, ClientError> {
836        let ts_checkpoint = self.api.latest_checkpoint(registry_domain).await?;
837        let checkpoint = &ts_checkpoint.as_ref().checkpoint;
838
839        tracing::debug!(
840            log_length = checkpoint.log_length,
841            registry_header = ?registry_domain,
842            "updating to checkpoint",
843        );
844
845        // operator log info
846        let mut operator = self
847            .registry
848            .load_operator(registry_domain)
849            .await?
850            .unwrap_or_default();
851
852        // map package names to package logs that need to be updated
853        let mut packages = packages
854            .into_iter()
855            .filter_map(|p| match &p.checkpoint {
856                // Don't bother updating if the package is already at the specified checkpoint
857                // If `registry` field is not set, then update.
858                Some(c) if p.registry.is_some() && c == checkpoint => None,
859                _ => Some((LogId::package_log::<Sha256>(&p.name), p)),
860            })
861            .inspect(|(_, p)| tracing::info!("package `{name}` will be updated", name = p.name))
862            .collect::<IndexMap<_, _>>();
863
864        // if operator log and all packages are up to date at the latest checkpoint, then return
865        if operator.checkpoint.is_some_and(|c| &c == checkpoint) && packages.is_empty() {
866            return Ok(IndexMap::default());
867        }
868
869        // federated packages in other registries
870        let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
871            IndexMap::with_capacity(packages.len());
872
873        // loop and fetch logs
874        let has_auth_token = self.api.auth_token().is_some();
875        loop {
876            let response = match self
877                .api
878                .fetch_logs(
879                    registry_domain,
880                    FetchLogsRequest {
881                        log_length: checkpoint.log_length,
882                        operator: operator
883                            .head_fetch_token
884                            .as_ref()
885                            .map(|t| Cow::Borrowed(t.as_str())),
886                        limit: None,
887                        // last known fetch token for each package log ID
888                        packages: Cow::Owned(
889                            packages
890                                .iter()
891                                .map(|(id, p)| (id.clone(), p.head_fetch_token.clone()))
892                                .collect::<IndexMap<_, _>>(),
893                        ),
894                    },
895                )
896                .await
897                .inspect(|res| {
898                    for warning in res.warnings.iter() {
899                        tracing::warn!("Fetch warning from registry: {}", warning.message);
900                    }
901                }) {
902                Ok(res) => Ok(res),
903                Err(err) => match &err {
904                    api::ClientError::Fetch(FetchError::LogNotFound(log_id))
905                    | api::ClientError::Package(PackageError::LogNotFound(log_id)) => {
906                        if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
907                            Err(ClientError::PackageDoesNotExist {
908                                name,
909                                has_auth_token,
910                            })
911                        } else {
912                            Err(ClientError::Api(err))
913                        }
914                    }
915
916                    api::ClientError::LogNotFoundWithHint(log_id, hint)
917                        if self.disable_interactive =>
918                    {
919                        let name = packages.get(log_id).unwrap().name.clone();
920
921                        match hint.to_str().ok().map(|s| s.split_once('=')) {
922                            Some(Some((namespace, registry))) if packages.contains_key(log_id) => {
923                                Err(ClientError::PackageDoesNotExistWithHintHeader {
924                                    name,
925                                    has_auth_token,
926                                    hint_namespace: namespace.to_string(),
927                                    hint_registry: registry.to_string(),
928                                })
929                            }
930                            _ => Err(ClientError::PackageDoesNotExist {
931                                name,
932                                has_auth_token,
933                            }),
934                        }
935                    }
936
937                    #[cfg(feature = "cli-interactive")]
938                    api::ClientError::LogNotFoundWithHint(log_id, hint) => {
939                        match hint.to_str().ok().map(|s| s.split_once('=')) {
940                            Some(Some((namespace, registry)))
941                                if !self.ignore_federation_hints
942                                    && packages.contains_key(log_id) =>
943                            {
944                                use dialoguer::{theme::ColorfulTheme, Confirm};
945
946                                let package_name = &packages.get(log_id).unwrap().name;
947
948                                if !self.disable_auto_accept_federation_hints
949                                    || Confirm::with_theme(&ColorfulTheme::default())
950                                        .with_prompt(format!(
951"Package `{package_name}` is not in `{current_registry}` registry.
952Registry recommends using `{registry}` registry for packages in `{namespace}` namespace.
953Accept recommendation y/N\n",
954current_registry = registry_domain.map(|d| d.as_str()).unwrap_or(&self.url().safe_label()),
955))
956                                        .default(true)
957                                        .interact()
958                                        .unwrap()
959                                {
960                                    let federated_registry_domain =
961                                        Some(RegistryDomain::from_str(registry)?);
962                                    self.store_namespace(
963                                        namespace.to_string(),
964                                        federated_registry_domain.clone().unwrap(),
965                                    )
966                                    .await?;
967
968                                    // filter packages with namespace in other registry
969                                    packages = packages
970                                        .into_iter()
971                                        .filter_map(|(log_id, package_info)| {
972                                            if package_info.name.namespace() == namespace {
973                                                if let Some(package_set) = federated_packages
974                                                    .get_mut(&federated_registry_domain)
975                                                {
976                                                    package_set.push(package_info);
977                                                } else {
978                                                    federated_packages.insert(
979                                                        federated_registry_domain.clone(),
980                                                        vec![package_info],
981                                                    );
982                                                }
983
984                                                None
985                                            } else {
986                                                Some((log_id, package_info))
987                                            }
988                                        })
989                                        .collect();
990
991                                    // continue fetching logs from this registry
992                                    continue;
993                                } else {
994                                    Err(ClientError::PackageDoesNotExist {
995                                        name: package_name.clone(),
996                                        has_auth_token,
997                                    })
998                                }
999                            }
1000                            _ => {
1001                                if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
1002                                    Err(ClientError::PackageDoesNotExist {
1003                                        name,
1004                                        has_auth_token,
1005                                    })
1006                                } else {
1007                                    Err(ClientError::Api(err))
1008                                }
1009                            }
1010                        }
1011                    }
1012                    _ => Err(ClientError::Api(err)),
1013                },
1014            }?;
1015
1016            for record in response.operator {
1017                let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
1018                    record.envelope.try_into()?;
1019
1020                // skip over records that has already seen
1021                if operator.head_registry_index.is_none()
1022                    || proto_envelope.registry_index > operator.head_registry_index.unwrap()
1023                {
1024                    operator.state = operator
1025                        .state
1026                        .validate(&proto_envelope.envelope)
1027                        .map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
1028                    operator.head_registry_index = Some(proto_envelope.registry_index);
1029                    operator.head_fetch_token = Some(record.fetch_token);
1030                }
1031            }
1032
1033            for (log_id, records) in response.packages {
1034                let package = packages.get_mut(&log_id).ok_or_else(|| {
1035                    anyhow!("received records for unknown package log `{log_id}`")
1036                })?;
1037
1038                for record in records {
1039                    let proto_envelope: PublishedProtoEnvelope<package::PackageRecord> =
1040                        record.envelope.try_into()?;
1041
1042                    // skip over records that has already seen
1043                    if package.head_registry_index.is_none()
1044                        || proto_envelope.registry_index > package.head_registry_index.unwrap()
1045                    {
1046                        let state = std::mem::take(&mut package.state);
1047                        package.state =
1048                            state.validate(&proto_envelope.envelope).map_err(|inner| {
1049                                ClientError::PackageValidationFailed {
1050                                    name: package.name.clone(),
1051                                    inner,
1052                                }
1053                            })?;
1054                        package.head_registry_index = Some(proto_envelope.registry_index);
1055                        package.head_fetch_token = Some(record.fetch_token);
1056                    }
1057                }
1058
1059                // At this point, the package log should not be empty
1060                if package.state.head().is_none() {
1061                    return Err(ClientError::PackageLogEmpty {
1062                        name: package.name.clone(),
1063                    });
1064                }
1065            }
1066
1067            if !response.more {
1068                break;
1069            }
1070        }
1071
1072        // verify checkpoint signature
1073        TimestampedCheckpoint::verify(
1074            operator
1075                .state
1076                .public_key(ts_checkpoint.to_owned().to_owned().key_id())
1077                .ok_or(ClientError::InvalidCheckpointKeyId {
1078                    key_id: ts_checkpoint.key_id().clone(),
1079                })?,
1080            &ts_checkpoint.as_ref().encode(),
1081            ts_checkpoint.signature(),
1082        )
1083        .or(Err(ClientError::InvalidCheckpointSignature))?;
1084
1085        // Prove inclusion for the current log heads
1086        let mut leaf_indices = Vec::with_capacity(packages.len() + 1 /* for operator */);
1087        let mut leafs = Vec::with_capacity(leaf_indices.len());
1088
1089        // operator record inclusion
1090        if let Some(index) = operator.head_registry_index {
1091            leaf_indices.push(index);
1092            leafs.push(LogLeaf {
1093                log_id: LogId::operator_log::<Sha256>(),
1094                record_id: operator.state.head().as_ref().unwrap().digest.clone(),
1095            });
1096        } else {
1097            return Err(ClientError::NoOperatorRecords);
1098        }
1099
1100        // package records inclusion
1101        for (log_id, package) in &packages {
1102            if let Some(index) = package.head_registry_index {
1103                leaf_indices.push(index);
1104                leafs.push(LogLeaf {
1105                    log_id: log_id.clone(),
1106                    record_id: package.state.head().as_ref().unwrap().digest.clone(),
1107                });
1108            } else {
1109                return Err(ClientError::PackageLogEmpty {
1110                    name: package.name.clone(),
1111                });
1112            }
1113        }
1114
1115        if !leafs.is_empty() {
1116            self.api
1117                .prove_inclusion(
1118                    registry_domain,
1119                    InclusionRequest {
1120                        log_length: checkpoint.log_length,
1121                        leafs: leaf_indices,
1122                    },
1123                    checkpoint,
1124                    &leafs,
1125                )
1126                .await?;
1127        }
1128
1129        if let Some(from) = self.registry.load_checkpoint(registry_domain).await? {
1130            let from_log_length = from.as_ref().checkpoint.log_length;
1131            let to_log_length = ts_checkpoint.as_ref().checkpoint.log_length;
1132
1133            match from_log_length.cmp(&to_log_length) {
1134                Ordering::Greater => {
1135                    return Err(ClientError::CheckpointLogLengthRewind {
1136                        from: from_log_length,
1137                        to: to_log_length,
1138                    });
1139                }
1140                Ordering::Less => {
1141                    self.api
1142                        .prove_log_consistency(
1143                            registry_domain,
1144                            ConsistencyRequest {
1145                                from: from_log_length,
1146                                to: to_log_length,
1147                            },
1148                            Cow::Borrowed(&from.as_ref().checkpoint.log_root),
1149                            Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
1150                        )
1151                        .await?
1152                }
1153                Ordering::Equal => {
1154                    if from.as_ref().checkpoint.log_root
1155                        != ts_checkpoint.as_ref().checkpoint.log_root
1156                        || from.as_ref().checkpoint.map_root
1157                            != ts_checkpoint.as_ref().checkpoint.map_root
1158                    {
1159                        return Err(ClientError::CheckpointChangedLogRootOrMapRoot {
1160                            log_length: from_log_length,
1161                        });
1162                    }
1163                }
1164            }
1165        }
1166
1167        operator.registry = registry_domain
1168            .cloned()
1169            .or_else(|| Some(self.url().registry_domain()));
1170        operator.checkpoint = Some(checkpoint.clone()); // updated to this checkpoint
1171        self.registry
1172            .store_operator(registry_domain, operator)
1173            .await?;
1174
1175        for package in packages.values_mut() {
1176            package.registry = registry_domain
1177                .cloned()
1178                .or_else(|| Some(self.url().registry_domain()));
1179            package.checkpoint = Some(checkpoint.clone()); // updated to this checkpoint
1180            self.registry
1181                .store_package(registry_domain, package)
1182                .await?;
1183        }
1184
1185        self.registry
1186            .store_checkpoint(registry_domain, &ts_checkpoint)
1187            .await?;
1188
1189        // return packages to be retrieved from other registries
1190        Ok(federated_packages)
1191    }
1192
1193    /// Update checkpoint for list of packages
1194    async fn update_checkpoints<'a>(
1195        &self,
1196        packages: impl IntoIterator<Item = &mut PackageInfo>,
1197    ) -> Result<(), ClientError> {
1198        // first collect the packages that we already have namespace mappings for
1199        let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
1200            IndexMap::new();
1201        for package in packages.into_iter() {
1202            let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
1203            if let Some(package_set) = federated_packages.get_mut(&registry_domain) {
1204                package_set.push(package);
1205            } else {
1206                federated_packages.insert(registry_domain, vec![package]);
1207            }
1208        }
1209
1210        while let Some((registry_domain, packages)) = federated_packages.pop() {
1211            for (registry_domain, packages) in self
1212                .update_packages_and_return_federated_packages(registry_domain.as_ref(), packages)
1213                .await?
1214                .into_iter()
1215            {
1216                if let Some(package_set) = federated_packages.get_mut(&registry_domain) {
1217                    package_set.extend(packages);
1218                } else {
1219                    federated_packages.insert(registry_domain, packages);
1220                }
1221            }
1222        }
1223
1224        Ok(())
1225    }
1226
1227    /// Fetches package logs without checking local storage first.
1228    pub async fn fetch_packages(
1229        &self,
1230        names: impl IntoIterator<Item = &PackageName>,
1231    ) -> Result<Vec<PackageInfo>, ClientError> {
1232        let mut packages: Vec<PackageInfo> = names
1233            .into_iter()
1234            .map(|name| PackageInfo::new(name.clone()))
1235            .collect();
1236        self.update_checkpoints(packages.iter_mut()).await?;
1237        Ok(packages)
1238    }
1239
1240    /// Fetches the `PackageInfo` without checking local storage first.
1241    pub async fn fetch_package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1242        let mut info = PackageInfo::new(name.clone());
1243        self.update_checkpoints([&mut info]).await?;
1244        Ok(info)
1245    }
1246
1247    /// Retrieves the `PackageInfo` from local storage, if present, otherwise fetches from the
1248    /// registry.
1249    pub async fn package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1250        let registry_domain = self.get_warg_registry(name.namespace()).await?;
1251        match self
1252            .registry
1253            .load_package(registry_domain.as_ref(), name)
1254            .await?
1255        {
1256            Some(mut info) => {
1257                tracing::info!("log for package `{name}` already exists in storage");
1258                if info.registry.is_none() {
1259                    info.registry = registry_domain
1260                        .clone()
1261                        .or_else(|| Some(self.url().registry_domain()));
1262                }
1263                Ok(info)
1264            }
1265            None => {
1266                let mut info = PackageInfo::new(name.clone());
1267                self.update_checkpoints([&mut info]).await?;
1268                Ok(info)
1269            }
1270        }
1271    }
1272
1273    async fn get_package_record(
1274        &self,
1275        registry_domain: Option<&RegistryDomain>,
1276        package: &PackageName,
1277        log_id: &LogId,
1278        record_id: &RecordId,
1279    ) -> ClientResult<PackageRecord> {
1280        let record = self
1281            .api
1282            .get_package_record(registry_domain, log_id, record_id)
1283            .await
1284            .map_err(|e| match e {
1285                api::ClientError::Package(PackageError::Rejection(reason)) => {
1286                    ClientError::PublishRejected {
1287                        name: package.clone(),
1288                        reason,
1289                        record_id: record_id.clone(),
1290                    }
1291                }
1292                e => {
1293                    ClientError::translate_log_not_found(e, self.api.auth_token().is_some(), |id| {
1294                        if id == log_id {
1295                            Some(package.clone())
1296                        } else {
1297                            None
1298                        }
1299                    })
1300                }
1301            })?;
1302        Ok(record)
1303    }
1304
1305    /// Downloads the content for the specified digest into client storage.
1306    ///
1307    /// If the content already exists in client storage, the existing path
1308    /// is returned.
1309    async fn download_content(
1310        &self,
1311        registry_domain: Option<&RegistryDomain>,
1312        digest: &AnyHash,
1313    ) -> Result<PathBuf, ClientError> {
1314        match self.content.content_location(digest) {
1315            Some(path) => {
1316                tracing::info!("content for digest `{digest}` already exists in storage");
1317                Ok(path)
1318            }
1319            None => {
1320                self.content
1321                    .store_content(
1322                        Box::pin(self.api.download_content(registry_domain, digest).await?),
1323                        Some(digest),
1324                    )
1325                    .await?;
1326
1327                self.content
1328                    .content_location(digest)
1329                    .ok_or_else(|| ClientError::ContentNotFound {
1330                        digest: digest.clone(),
1331                    })
1332            }
1333        }
1334    }
1335
1336    /// Downloads the content for the specified digest as a stream.
1337    ///
1338    /// If the content already exists in client storage, it is read from the client storage.
1339    ///
1340    /// The download is not stored in client storage.
1341    async fn download_content_stream(
1342        &self,
1343        registry_domain: Option<&RegistryDomain>,
1344        digest: &AnyHash,
1345    ) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
1346        match self.content.content_location(digest) {
1347            Some(path) => {
1348                tracing::info!("content for digest `{digest}` already exists in storage");
1349                let file = tokio::fs::File::open(path)
1350                    .await
1351                    .map_err(ClientError::IoError)?;
1352                Ok(ReaderStream::new(file).map_err(Into::into).boxed())
1353            }
1354            None => Ok(Box::pin(
1355                self.api.download_content(registry_domain, digest).await?,
1356            )),
1357        }
1358    }
1359}
1360/// A Warg registry client that uses the local file system to store
1361/// package logs and content.
1362pub type FileSystemClient =
1363    Client<FileSystemRegistryStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage>;
1364
1365/// A result of an attempt to lock client storage.
1366pub enum StorageLockResult<T> {
1367    /// The storage lock was acquired.
1368    Acquired(T),
1369    /// The storage lock was not acquired for the specified directory.
1370    NotAcquired(PathBuf),
1371}
1372
1373impl FileSystemClient {
1374    async fn storage_paths(
1375        url: Option<&str>,
1376        config: &Config,
1377        disable_interactive: bool,
1378    ) -> Result<StoragePaths, ClientError> {
1379        let checking_url_for_well_known = RegistryUrl::new(
1380            url.or(config.home_url.as_deref())
1381                .unwrap_or(DEFAULT_REGISTRY),
1382        )?;
1383
1384        let url = if let Some(warg_url) =
1385            api::Client::new(checking_url_for_well_known.to_string(), None)?
1386                .well_known_config()
1387                .await?
1388        {
1389            if !disable_interactive && warg_url != checking_url_for_well_known {
1390                println!(
1391                    "Resolved `{well_known}` to registry hosted on `{registry}`",
1392                    well_known = checking_url_for_well_known.registry_domain(),
1393                    registry = warg_url.registry_domain(),
1394                );
1395            }
1396            warg_url
1397        } else {
1398            RegistryUrl::new(
1399                url.or(config.home_url.as_deref())
1400                    .ok_or(ClientError::NoHomeRegistryUrl)?,
1401            )?
1402        };
1403
1404        config.storage_paths_for_url(url)
1405    }
1406
1407    /// Attempts to create a client for the given registry URL.
1408    ///
1409    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1410    /// URL, an error is returned.
1411    ///
1412    /// If a lock cannot be acquired for a storage directory, then
1413    /// `NewClientResult::Blocked` is returned with the path to the
1414    /// directory that could not be locked.
1415    pub async fn try_new_with_config(
1416        registry: Option<&str>,
1417        config: &Config,
1418        mut auth_token: Option<Secret<String>>,
1419    ) -> Result<StorageLockResult<Self>, ClientError> {
1420        let disable_interactive =
1421            cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1422
1423        let StoragePaths {
1424            registry_url: url,
1425            registries_dir,
1426            content_dir,
1427            namespace_map_path,
1428        } = Self::storage_paths(registry, config, disable_interactive).await?;
1429
1430        let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1431            (config.keyring_backend.clone(), config.keys.clone())
1432        } else {
1433            (None, IndexSet::new())
1434        };
1435
1436        #[cfg(feature = "keyring")]
1437        if auth_token.is_none() && config.keyring_auth {
1438            auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1439        }
1440
1441        let (packages, content, namespace_map) = match (
1442            FileSystemRegistryStorage::try_lock(registries_dir.clone())?,
1443            FileSystemContentStorage::try_lock(content_dir.clone())?,
1444            FileSystemNamespaceMapStorage::new(namespace_map_path.clone()),
1445        ) {
1446            (Some(packages), Some(content), namespace_map) => (packages, content, namespace_map),
1447            (None, _, _) => return Ok(StorageLockResult::NotAcquired(registries_dir)),
1448            (_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)),
1449        };
1450
1451        Ok(StorageLockResult::Acquired(Self::new(
1452            url.into_url(),
1453            packages,
1454            content,
1455            namespace_map,
1456            auth_token,
1457            config.ignore_federation_hints,
1458            config.disable_auto_accept_federation_hints,
1459            config.disable_auto_package_init,
1460            disable_interactive,
1461            keyring_backend,
1462            keys,
1463        )?))
1464    }
1465
1466    /// Attempts to create a client for the given registry URL.
1467    ///
1468    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1469    /// URL, an error is returned.
1470    ///
1471    /// If a lock cannot be acquired for a storage directory, then
1472    /// `NewClientResult::Blocked` is returned with the path to the
1473    /// directory that could not be locked.
1474    ///
1475    /// Same as calling `try_new_with_config` with
1476    /// `Config::from_default_file()?.unwrap_or_default()`.
1477    pub async fn try_new_with_default_config(
1478        url: Option<&str>,
1479    ) -> Result<StorageLockResult<Self>, ClientError> {
1480        Self::try_new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None)
1481            .await
1482    }
1483
1484    /// Creates a client for the given registry URL.
1485    ///
1486    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1487    /// URL, an error is returned.
1488    ///
1489    /// This method blocks if storage locks cannot be acquired.
1490    pub async fn new_with_config(
1491        registry: Option<&str>,
1492        config: &Config,
1493        mut auth_token: Option<Secret<String>>,
1494    ) -> Result<Self, ClientError> {
1495        let disable_interactive =
1496            cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1497
1498        let StoragePaths {
1499            registry_url: url,
1500            registries_dir,
1501            content_dir,
1502            namespace_map_path,
1503        } = Self::storage_paths(registry, config, disable_interactive).await?;
1504
1505        let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1506            (config.keyring_backend.clone(), config.keys.clone())
1507        } else {
1508            (None, IndexSet::new())
1509        };
1510
1511        #[cfg(feature = "keyring")]
1512        if auth_token.is_none() && config.keyring_auth {
1513            auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1514        }
1515
1516        Self::new(
1517            url.into_url(),
1518            FileSystemRegistryStorage::lock(registries_dir)?,
1519            FileSystemContentStorage::lock(content_dir)?,
1520            FileSystemNamespaceMapStorage::new(namespace_map_path),
1521            auth_token,
1522            config.ignore_federation_hints,
1523            config.disable_auto_accept_federation_hints,
1524            config.disable_auto_package_init,
1525            disable_interactive,
1526            keyring_backend,
1527            keys,
1528        )
1529    }
1530
1531    /// Creates a client for the given registry URL.
1532    ///
1533    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1534    /// URL, an error is returned.
1535    ///
1536    /// This method blocks if storage locks cannot be acquired.
1537    ///
1538    /// Same as calling `new_with_config` with
1539    /// `Config::from_default_file()?.unwrap_or_default()`.
1540    pub async fn new_with_default_config(url: Option<&str>) -> Result<Self, ClientError> {
1541        Self::new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None).await
1542    }
1543}
1544
1545/// Represents information about a downloaded package.
1546#[derive(Debug, Clone)]
1547pub struct PackageDownload {
1548    /// The package version that was downloaded.
1549    pub version: Version,
1550    /// The digest of the package contents.
1551    pub digest: AnyHash,
1552    /// The path to the downloaded package contents.
1553    pub path: PathBuf,
1554}
1555
1556/// Represents information about a downloaded package.
1557pub struct PackageDownloadInfo {
1558    /// The package version that was downloaded.
1559    pub version: Version,
1560    /// The digest of the package contents.
1561    pub digest: AnyHash,
1562}
1563
1564/// Represents an error returned by Warg registry clients.
1565#[derive(Debug, Error)]
1566pub enum ClientError {
1567    /// No home registry registry server URL is configured.
1568    #[error("no home registry registry server URL is configured")]
1569    NoHomeRegistryUrl,
1570
1571    /// Reset registry local state.
1572    #[error("reset registry state failed")]
1573    ResettingRegistryLocalStateFailed,
1574
1575    /// Clearing content local cache.
1576    #[error("clear content cache failed")]
1577    ClearContentCacheFailed,
1578
1579    /// Unauthorized rejection
1580    #[error("unauthorized: {0}")]
1581    Unauthorized(String),
1582
1583    /// Checkpoint signature failed verification
1584    #[error("invalid checkpoint signature")]
1585    InvalidCheckpointSignature,
1586
1587    /// Checkpoint signature failed verification
1588    #[error("invalid checkpoint key ID `{key_id}`")]
1589    InvalidCheckpointKeyId {
1590        /// The signature key ID.
1591        key_id: signing::KeyID,
1592    },
1593
1594    /// The server did not provide operator records.
1595    #[error("the server did not provide any operator records")]
1596    NoOperatorRecords,
1597
1598    /// The operator failed validation.
1599    #[error("operator failed validation: {inner}")]
1600    OperatorValidationFailed {
1601        /// The validation error.
1602        inner: operator::ValidationError,
1603    },
1604
1605    /// The package already exists and cannot be initialized.
1606    #[error("package `{name}` already exists and cannot be initialized")]
1607    CannotInitializePackage {
1608        /// The package name that already exists.
1609        name: PackageName,
1610        /// The record identifier for the init record.
1611        init_record_id: Option<RecordId>,
1612    },
1613
1614    /// The package must be initialized before publishing.
1615    #[error("package `{name}` must be initialized before publishing")]
1616    MustInitializePackage {
1617        /// The name of the package that must be initialized.
1618        name: PackageName,
1619        /// Client has authentication credentials.
1620        has_auth_token: bool,
1621    },
1622
1623    /// There is no publish operation in progress.
1624    #[error("there is no publish operation in progress")]
1625    NotPublishing,
1626
1627    /// The package has no records to publish.
1628    #[error("package `{name}` has no records to publish")]
1629    NothingToPublish {
1630        /// The package that has no publish operations.
1631        name: PackageName,
1632    },
1633
1634    /// The package does not exist.
1635    #[error("package `{name}` does not exist")]
1636    PackageDoesNotExist {
1637        /// The missing package.
1638        name: PackageName,
1639        /// Client has authentication credentials.
1640        has_auth_token: bool,
1641    },
1642
1643    /// The package does not exist with hint header.
1644    #[error("package `{name}` does not exist but the registry suggests checking registry `{hint_registry}` for packages in namespace `{hint_namespace}`")]
1645    PackageDoesNotExistWithHintHeader {
1646        /// The missing package.
1647        name: PackageName,
1648        /// Client has authentication credentials.
1649        has_auth_token: bool,
1650        /// The hint namespace.
1651        hint_namespace: String,
1652        /// The hint registry.
1653        hint_registry: String,
1654    },
1655
1656    /// The package version does not exist.
1657    #[error("version `{version}` of package `{name}` does not exist")]
1658    PackageVersionDoesNotExist {
1659        /// The missing version of the package.
1660        version: Version,
1661        /// The package with the missing version.
1662        name: PackageName,
1663    },
1664
1665    /// The package version requirement does not exist.
1666    #[error("version that satisfies requirement `{version}` was not found for package `{name}`")]
1667    PackageVersionRequirementDoesNotExist {
1668        /// The missing version requirement of the package.
1669        version: VersionReq,
1670        /// The package with the missing version.
1671        name: PackageName,
1672    },
1673
1674    /// The package failed validation.
1675    #[error("package `{name}` failed validation: {inner}")]
1676    PackageValidationFailed {
1677        /// The package that failed validation.
1678        name: PackageName,
1679        /// The validation error.
1680        inner: package::ValidationError,
1681    },
1682
1683    /// Content was not found during a publish operation.
1684    #[error("content with digest `{digest}` was not found in client storage")]
1685    ContentNotFound {
1686        /// The digest of the missing content.
1687        digest: AnyHash,
1688    },
1689
1690    /// Content digest was different than expected.
1691    #[error("content with digest `{digest}` was not found expected `{expected}`")]
1692    IncorrectContent {
1693        /// The digest of the missing content.
1694        digest: AnyHash,
1695        /// The expected
1696        expected: AnyHash,
1697    },
1698
1699    /// The package log is empty and cannot be validated.
1700    #[error("package log is empty and cannot be validated")]
1701    PackageLogEmpty {
1702        /// The package with an empty package log.
1703        name: PackageName,
1704    },
1705
1706    /// A publish operation was rejected.
1707    #[error("the publishing of package `{name}` was rejected due to: {reason}")]
1708    PublishRejected {
1709        /// The package that was rejected.
1710        name: PackageName,
1711        /// The record identifier for the record that was rejected.
1712        record_id: RecordId,
1713        /// The reason it was rejected.
1714        reason: String,
1715    },
1716
1717    /// A publish operation was rejected due to conflicting pending publish.
1718    #[error("the publishing of package `{name}` was rejected due to conflicting pending publish of record `{pending_record_id}`")]
1719    ConflictPendingPublish {
1720        /// The package that was rejected.
1721        name: PackageName,
1722        /// The record identifier for the record that was rejected.
1723        record_id: RecordId,
1724        /// The record identifier for the pending publish record.
1725        pending_record_id: RecordId,
1726    },
1727
1728    /// The package is still missing content.
1729    #[error("the package is still missing content after all content was uploaded")]
1730    PackageMissingContent,
1731
1732    /// The registry provided a latest checkpoint with a log length less than a previously provided
1733    /// checkpoint log length.
1734    #[error("registry rewinded checkpoints; latest checkpoint log length `{to}` is less than previously received checkpoint log length `{from}`")]
1735    CheckpointLogLengthRewind {
1736        /// The previously received checkpoint log length.
1737        from: RegistryLen,
1738        /// The latest checkpoint log length.
1739        to: RegistryLen,
1740    },
1741
1742    /// The registry provided a checkpoint with a different `log_root` and
1743    /// `map_root` than a previously provided checkpoint.
1744    #[error("registry provided a new checkpoint with the same log length `{log_length}` as previously fetched but different log root or map root")]
1745    CheckpointChangedLogRootOrMapRoot {
1746        /// The checkpoint log length.
1747        log_length: RegistryLen,
1748    },
1749
1750    /// An error occurred while accessing the keyring.
1751    #[error(transparent)]
1752    Keyring(#[from] crate::keyring::KeyringError),
1753
1754    /// An error occurred during an API operation.
1755    #[error(transparent)]
1756    Api(#[from] api::ClientError),
1757
1758    /// An error occurred while performing a client operation.
1759    #[error("{0:?}")]
1760    Other(#[from] anyhow::Error),
1761
1762    /// An error occurred while performing a IO.
1763    #[error("error: {0:?}")]
1764    IoError(#[from] std::io::Error),
1765}
1766
1767impl ClientError {
1768    fn translate_log_not_found(
1769        e: api::ClientError,
1770        has_auth_token: bool,
1771        lookup: impl Fn(&LogId) -> Option<PackageName>,
1772    ) -> Self {
1773        match &e {
1774            api::ClientError::Fetch(FetchError::LogNotFound(id))
1775            | api::ClientError::Package(PackageError::LogNotFound(id)) => {
1776                if let Some(name) = lookup(id) {
1777                    return Self::PackageDoesNotExist {
1778                        name,
1779                        has_auth_token,
1780                    };
1781                }
1782            }
1783            _ => {}
1784        }
1785
1786        Self::Api(e)
1787    }
1788}
1789
1790/// Represents the result of a client operation.
1791pub type ClientResult<T> = Result<T, ClientError>;