warg_client/
lib.rs

1//! A client library for Warg component registries.
2
3#![deny(missing_docs)]
4use crate::storage::PackageInfo;
5
6use anyhow::{anyhow, Context, Result};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt, TryStreamExt};
9use indexmap::{IndexMap, IndexSet};
10use reqwest::{Body, IntoUrl};
11use secrecy::Secret;
12use semver::{Version, VersionReq};
13use std::cmp::Ordering;
14use std::fs;
15use std::str::FromStr;
16use std::{borrow::Cow, path::PathBuf, time::Duration};
17use storage::{
18    ContentStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage,
19    FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
20};
21use thiserror::Error;
22use tokio_util::io::ReaderStream;
23use warg_api::v1::{
24    fetch::{FetchError, FetchLogsRequest},
25    package::{
26        MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27        UploadEndpoint,
28    },
29    proof::{ConsistencyRequest, InclusionRequest},
30};
31use warg_crypto::hash::Sha256;
32use warg_crypto::{hash::AnyHash, signing, Encode, Signable};
33use warg_protocol::package::ReleaseState;
34use warg_protocol::{
35    operator, package,
36    registry::{LogId, LogLeaf, PackageName, RecordId, RegistryLen, TimestampedCheckpoint},
37    PublishedProtoEnvelope,
38};
39use wasm_compose::graph::{CompositionGraph, EncodeOptions, ExportIndex, InstanceId};
40use wasmparser::Validator;
41
42#[cfg(feature = "keyring")]
43pub mod keyring;
44
45pub mod api;
46mod config;
47/// Tools for locking and bundling components
48pub mod depsolve;
49use depsolve::{Bundler, LockListBuilder};
50/// Tools for semver
51pub mod version_util;
52use version_util::{kindless_name, locked_package, versioned_package, Import, ImportKind};
53pub mod lock;
54mod registry_url;
55pub mod storage;
56pub use self::config::*;
57pub use self::registry_url::RegistryUrl;
58
59const DEFAULT_WAIT_INTERVAL: Duration = Duration::from_secs(1);
60
61/// For Bytecode Alliance projects, the default registry is set to `bytecodealliance.org`.
62/// The `.well-known` config path may resolve to another domain where the registry is hosted.
63pub const DEFAULT_REGISTRY: &str = "bytecodealliance.org";
64
65/// A client for a Warg registry.
66pub struct Client<R, C, N>
67where
68    R: RegistryStorage,
69    C: ContentStorage,
70    N: NamespaceMapStorage,
71{
72    registry: R,
73    content: C,
74    namespace_map: N,
75    api: api::Client,
76    ignore_federation_hints: bool,
77    disable_auto_accept_federation_hints: bool,
78    disable_auto_package_init: bool,
79    disable_interactive: bool,
80    keyring_backend: Option<String>,
81    keys: IndexSet<String>,
82}
83
84impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C, N> {
85    /// Creates a new client for the given URL, registry storage, and
86    /// content storage.
87    #[allow(clippy::too_many_arguments)]
88    pub fn new(
89        url: impl IntoUrl,
90        registry: R,
91        content: C,
92        namespace_map: N,
93        auth_token: Option<Secret<String>>,
94        ignore_federation_hints: bool,
95        disable_auto_accept_federation_hints: bool,
96        disable_auto_package_init: bool,
97        disable_interactive: bool,
98        keyring_backend: Option<String>,
99        keys: IndexSet<String>,
100    ) -> ClientResult<Self> {
101        let api = api::Client::new(url, auth_token)?;
102        Ok(Self {
103            registry,
104            content,
105            namespace_map,
106            api,
107            ignore_federation_hints,
108            disable_auto_accept_federation_hints,
109            disable_auto_package_init,
110            disable_interactive,
111            keyring_backend,
112            keys,
113        })
114    }
115
116    /// Gets the URL of the client.
117    pub fn url(&self) -> &RegistryUrl {
118        self.api.url()
119    }
120
121    /// Gets the registry storage used by the client.
122    pub fn registry(&self) -> &R {
123        &self.registry
124    }
125
126    /// Gets the content storage used by the client.
127    pub fn content(&self) -> &C {
128        &self.content
129    }
130
131    /// Gets the namespace map
132    pub fn namespace_map(&self) -> &N {
133        &self.namespace_map
134    }
135
136    /// Get warg registry domain.
137    pub async fn get_warg_registry(
138        &self,
139        namespace: &str,
140    ) -> Result<Option<RegistryDomain>, ClientError> {
141        let operator = self
142            .registry()
143            .load_operator(Some(&RegistryDomain::from_str(namespace)?))
144            .await?;
145        if let Some(op) = operator {
146            match op.state.namespace_state(namespace) {
147                Some(warg_protocol::operator::NamespaceState::Imported { registry }) => {
148                    return Ok(Some(RegistryDomain::from_str(registry)?));
149                }
150                Some(warg_protocol::operator::NamespaceState::Defined) => {
151                    return Ok(None);
152                }
153                _ => (),
154            }
155        };
156        let nm_map = self.namespace_map.load_namespace_map().await?;
157        Ok(nm_map.and_then(|nm_map| {
158            nm_map
159                .get(namespace)
160                .map(|domain| RegistryDomain::from_str(domain).unwrap())
161        }))
162    }
163
164    /// Stores namespace mapping in local storage
165    pub async fn store_namespace(
166        &self,
167        namespace: String,
168        registry_domain: RegistryDomain,
169    ) -> Result<()> {
170        self.namespace_map
171            .store_namespace(namespace, registry_domain)
172            .await?;
173        Ok(())
174    }
175
176    /// Resets the namespace map
177    pub async fn reset_namespaces(&self) -> Result<()> {
178        self.namespace_map.reset_namespaces().await?;
179        Ok(())
180    }
181
182    /// Reset client storage for the registry.
183    pub async fn reset_registry(&self) -> ClientResult<()> {
184        tracing::info!("resetting registry local state");
185        self.registry
186            .reset(true)
187            .await
188            .or(Err(ClientError::ResettingRegistryLocalStateFailed))
189    }
190
191    /// Clear client content cache.
192    pub async fn clear_content_cache(&self) -> ClientResult<()> {
193        tracing::info!("removing content cache");
194        self.content
195            .clear()
196            .await
197            .or(Err(ClientError::ClearContentCacheFailed))
198    }
199
200    /// Locks component
201    pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
202        let mut builder = LockListBuilder::default();
203        builder.build_list(self, info).await?;
204        let top = Import {
205            name: format!("{}:{}", info.name.namespace(), info.name.name()),
206            req: VersionReq::STAR,
207            kind: ImportKind::Unlocked,
208        };
209        builder.lock_list.insert(top);
210        let mut composer = CompositionGraph::new();
211        let mut handled = IndexMap::<String, InstanceId>::new();
212        for package in builder.lock_list {
213            let name = package.name.clone();
214            let version = package.req;
215            let id = PackageName::new(name)?;
216            let info = self
217                .registry()
218                .load_package(self.get_warg_registry(id.namespace()).await?.as_ref(), &id)
219                .await?;
220            if let Some(inf) = info {
221                let release = if version != VersionReq::STAR {
222                    inf.state
223                        .releases()
224                        .filter(|r| version.matches(&r.version))
225                        .last()
226                } else {
227                    inf.state.releases().last()
228                };
229
230                if let Some(r) = release {
231                    let state = &r.state;
232                    if let ReleaseState::Released { content } = state {
233                        let locked_package = locked_package(&package.name, r, content);
234                        let path = self.content().content_location(content);
235                        if let Some(p) = path {
236                            let bytes = fs::read(&p).map_err(|_| ClientError::ContentNotFound {
237                                digest: content.clone(),
238                            })?;
239
240                            let read_digest =
241                                AnyHash::from_str(&format!("sha256:{}", sha256::digest(bytes)))
242                                    .unwrap();
243                            if content != &read_digest {
244                                return Err(ClientError::IncorrectContent {
245                                    digest: read_digest,
246                                    expected: content.clone(),
247                                });
248                            }
249                            let mut validator = Validator::new();
250                            let component = wasm_compose::graph::Component::from_file(
251                                &mut validator,
252                                &locked_package,
253                                p,
254                            )?;
255                            let component_id = if let Some((id, _)) =
256                                composer.get_component_by_name(&locked_package)
257                            {
258                                id
259                            } else {
260                                composer.add_component(component)?
261                            };
262                            let instance_id = composer.instantiate(component_id)?;
263                            let added = composer.get_component(component_id);
264                            handled.insert(versioned_package(&package.name, version), instance_id);
265                            let mut args = Vec::new();
266                            if let Some(added) = added {
267                                for (index, name, _) in added.imports() {
268                                    let iid = handled.get(kindless_name(name));
269                                    if let Some(arg) = iid {
270                                        args.push((arg, index));
271                                    }
272                                }
273                            }
274                            for arg in args {
275                                composer.connect(
276                                    *arg.0,
277                                    None::<ExportIndex>,
278                                    instance_id,
279                                    arg.1,
280                                )?;
281                            }
282                        }
283                    }
284                }
285            }
286        }
287        let final_name = &format!("{}:{}", info.name.namespace(), &info.name.name());
288        let id = handled.get(final_name);
289        let options = EncodeOptions {
290            export: id.copied(),
291            ..Default::default()
292        };
293        let locked = composer.encode(options)?;
294        fs::write("./locked.wasm", locked.as_slice()).map_err(|e| ClientError::Other(e.into()))?;
295        Ok(locked)
296    }
297
298    /// Bundles component
299    pub async fn bundle_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
300        let mut bundler = Bundler::new(self);
301        let path = PathBuf::from("./locked.wasm");
302        let locked = if !path.is_file() {
303            self.lock_component(info).await?
304        } else {
305            fs::read("./locked.wasm").map_err(|e| ClientError::Other(e.into()))?
306        };
307        let bundled = bundler.parse(&locked).await?;
308        fs::write("./bundled.wasm", bundled.as_slice())
309            .map_err(|e| ClientError::Other(e.into()))?;
310        Ok(bundled.as_slice().to_vec())
311    }
312
313    /// Submits the publish information in client storage.
314    ///
315    /// If there's no publishing information in client storage, an error is returned.
316    ///
317    /// Returns the identifier of the record that was published.
318    ///
319    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
320    pub async fn publish(&self, signing_key: &signing::PrivateKey) -> ClientResult<RecordId> {
321        let info = self
322            .registry
323            .load_publish()
324            .await?
325            .ok_or(ClientError::NotPublishing)?;
326
327        let res = self.publish_with_info(signing_key, info).await;
328        self.registry.store_publish(None).await?;
329        res
330    }
331
332    /// Submits the provided publish information or, if not provided, loads from client
333    /// storage. Uses the keyring to retrieve a key and sign.
334    ///
335    /// If there's no publishing information in client storage, an error is returned.
336    ///
337    /// Returns the identifier of the record that was published.
338    ///
339    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
340    #[cfg(feature = "keyring")]
341    pub async fn sign_with_keyring_and_publish(
342        &self,
343        publish_info: Option<PublishInfo>,
344    ) -> ClientResult<RecordId> {
345        let publish_info = if let Some(publish_info) = publish_info {
346            publish_info
347        } else {
348            self.registry
349                .load_publish()
350                .await?
351                .ok_or(ClientError::NotPublishing)?
352        };
353
354        let registry_domain = self
355            .get_warg_registry(publish_info.name.namespace())
356            .await?;
357        let signing_key = keyring::Keyring::new(
358            self.keyring_backend
359                .as_deref()
360                .unwrap_or(keyring::Keyring::DEFAULT_BACKEND),
361        )?
362        .get_signing_key(
363            registry_domain.map(|domain| domain.to_string()).as_deref(),
364            &self.keys,
365            Some(&self.url().to_string()),
366        )?;
367
368        let res = self.publish_with_info(&signing_key, publish_info).await;
369        self.registry.store_publish(None).await?;
370        res
371    }
372
373    /// Submits the provided publish information.
374    ///
375    /// Any publish information in client storage is ignored.
376    ///
377    /// Returns the identifier of the record that was published.
378    ///
379    /// Use `wait_for_publish` to wait for the record to transition to the `published` state.
380    pub async fn publish_with_info(
381        &self,
382        signing_key: &signing::PrivateKey,
383        publish_info: PublishInfo,
384    ) -> ClientResult<RecordId> {
385        if publish_info.entries.is_empty() {
386            return Err(ClientError::NothingToPublish {
387                name: publish_info.name.clone(),
388            });
389        }
390
391        tracing::info!(
392            "publishing {new}package `{name}`",
393            name = publish_info.name,
394            new = if publish_info.initializing() {
395                "new "
396            } else {
397                ""
398            }
399        );
400        tracing::debug!("entries: {:?}", publish_info.entries);
401
402        let mut accepted_prompt_to_initialize = false;
403
404        let mut init_record_id: Option<RecordId> = None;
405
406        let (package, record) = loop {
407            let mut info = publish_info.clone();
408
409            let mut initializing = info.initializing();
410
411            let package = match self.fetch_package(&info.name).await {
412                Ok(package) => {
413                    if initializing {
414                        return Err(ClientError::CannotInitializePackage {
415                            name: package.name,
416                            init_record_id,
417                        });
418                    } else if info.head.is_none() {
419                        // If we're not initializing the package and a head was not explicitly specified,
420                        // set to the latest known head.
421                        info.head = package.state.head().as_ref().map(|h| h.digest.clone());
422                    }
423                    package
424                }
425                Err(ClientError::PackageDoesNotExist {
426                    name,
427                    has_auth_token,
428                }) => {
429                    if !initializing {
430                        if !self.disable_auto_package_init {
431                            info.entries.insert(0, crate::storage::PublishEntry::Init);
432                            initializing = true;
433                            accepted_prompt_to_initialize = true;
434                        } else {
435                            if self.disable_interactive || cfg!(not(feature = "cli-interactive")) {
436                                return Err(ClientError::MustInitializePackage {
437                                    name,
438                                    has_auth_token,
439                                });
440                            }
441
442                            #[cfg(feature = "cli-interactive")]
443                            {
444                                use crate::storage::PublishEntry;
445                                use dialoguer::{theme::ColorfulTheme, Confirm};
446
447                                if accepted_prompt_to_initialize
448                                    || Confirm::with_theme(&ColorfulTheme::default())
449                                        .with_prompt(format!(
450                                            "Package `{package_name}` was not found.
451    If it exists, you may not have access.
452    Attempt to create `{package_name}` and publish the release y/N\n",
453                                            package_name = &info.name,
454                                        ))
455                                        .default(false)
456                                        .interact()
457                                        .unwrap()
458                                {
459                                    info.entries.insert(0, PublishEntry::Init);
460                                    initializing = true;
461                                    accepted_prompt_to_initialize = true;
462                                } else {
463                                    return Err(ClientError::MustInitializePackage {
464                                        name,
465                                        has_auth_token,
466                                    });
467                                }
468                            }
469                        }
470                    }
471                    PackageInfo::new(info.name.clone())
472                }
473                err => err?,
474            };
475            let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
476
477            let log_id = LogId::package_log::<Sha256>(&package.name);
478            let record = info.finalize(signing_key)?;
479            let record_id = RecordId::package_record::<Sha256>(&record);
480            let record = match self
481                .api
482                .publish_package_record(
483                    registry_domain.as_ref(),
484                    &log_id,
485                    PublishRecordRequest {
486                        package_name: Cow::Borrowed(&package.name),
487                        record: Cow::Owned(record.into()),
488                        content_sources: Default::default(),
489                    },
490                )
491                .await
492            {
493                Ok(record) => Ok(record),
494                Err(api::ClientError::Package(PackageError::Rejection(reason))) => {
495                    Err(ClientError::PublishRejected {
496                        name: package.name.clone(),
497                        reason,
498                        record_id,
499                    })
500                }
501                Err(api::ClientError::Package(PackageError::Unauthorized(reason))) => {
502                    Err(ClientError::Unauthorized(reason))
503                }
504                Err(api::ClientError::Package(PackageError::ConflictPendingPublish(
505                    pending_record_id,
506                ))) => {
507                    // conflicting pending publish succeeds,
508                    tracing::info!("waiting for conflicting publish to complete");
509                    // check registry for federated namespace mapping, if initializing
510                    if initializing {
511                        match self.fetch_package(&package.name).await {
512                            Ok(_) => {}
513                            // may not exist until conflicting publish completes
514                            Err(ClientError::PackageDoesNotExist { .. }) => {}
515                            Err(err) => return Err(err),
516                        }
517                        init_record_id = Some(pending_record_id.clone());
518                    }
519                    self.wait_for_publish(&package.name, &pending_record_id, DEFAULT_WAIT_INTERVAL)
520                        .await
521                        .map_err(|err| match err {
522                            ClientError::PackageMissingContent => {
523                                ClientError::ConflictPendingPublish {
524                                    name: package.name.clone(),
525                                    record_id,
526                                    pending_record_id,
527                                }
528                            }
529                            err => err,
530                        })?;
531
532                    continue;
533                }
534                Err(e) => Err(ClientError::translate_log_not_found(
535                    e,
536                    self.api.auth_token().is_some(),
537                    |id| {
538                        if id == &log_id {
539                            Some(package.name.clone())
540                        } else {
541                            None
542                        }
543                    },
544                )),
545            }?;
546
547            break (package, record);
548        };
549
550        // TODO: parallelize this
551        for (digest, MissingContent { upload }) in record.missing_content() {
552            // Upload the missing content, if the registry supports it
553            let Some(UploadEndpoint::Http {
554                method,
555                url,
556                headers,
557            }) = upload.first()
558            else {
559                continue;
560            };
561
562            self.api
563                .upload_content(
564                    method,
565                    url,
566                    headers,
567                    Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
568                        || ClientError::ContentNotFound {
569                            digest: digest.clone(),
570                        },
571                    )?),
572                )
573                .await
574                .map_err(|e| match e {
575                    api::ClientError::Package(PackageError::Rejection(reason)) => {
576                        ClientError::PublishRejected {
577                            name: package.name.clone(),
578                            record_id: record.record_id.clone(),
579                            reason,
580                        }
581                    }
582                    api::ClientError::Package(PackageError::Unauthorized(reason)) => {
583                        ClientError::Unauthorized(reason)
584                    }
585                    _ => e.into(),
586                })?;
587        }
588
589        Ok(record.record_id)
590    }
591
592    /// Waits for a package record to transition to the `published` state.
593    ///
594    /// The `interval` is the amount of time to wait between checks.
595    ///
596    /// Returns an error if the package record was rejected.
597    pub async fn wait_for_publish(
598        &self,
599        package: &PackageName,
600        record_id: &RecordId,
601        interval: Duration,
602    ) -> ClientResult<()> {
603        let registry_domain = self.get_warg_registry(package.namespace()).await?;
604        let log_id = LogId::package_log::<Sha256>(package);
605        let mut current = self
606            .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
607            .await?;
608
609        loop {
610            match current.state {
611                PackageRecordState::Sourcing { .. } => {
612                    return Err(ClientError::PackageMissingContent);
613                }
614                PackageRecordState::Published { .. } => {
615                    self.fetch_package(package).await?;
616                    return Ok(());
617                }
618                PackageRecordState::Rejected { reason } => {
619                    return Err(ClientError::PublishRejected {
620                        name: package.clone(),
621                        record_id: record_id.clone(),
622                        reason,
623                    });
624                }
625                PackageRecordState::Processing => {
626                    tokio::time::sleep(interval).await;
627                    current = self
628                        .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
629                        .await?;
630                }
631            }
632        }
633    }
634
635    /// Updates all package logs in client registry storage to the latest registry checkpoint.
636    pub async fn update(&self) -> ClientResult<()> {
637        tracing::info!("updating downloaded package logs");
638
639        for mut packages in self.registry.load_all_packages().await?.into_values() {
640            self.update_checkpoints(&mut packages).await?;
641        }
642
643        Ok(())
644    }
645
646    /// Downloads the latest version of a package into client storage that
647    /// satisfies the given version requirement.
648    ///
649    /// If the requested package log is not present in client storage, it
650    /// will be fetched from the registry first.
651    ///
652    /// An error is returned if the package does not exist.
653    ///
654    /// If a version satisfying the requirement does not exist, `None` is
655    /// returned.
656    ///
657    /// Returns the path within client storage of the package contents for
658    /// the resolved version.
659    pub async fn download(
660        &self,
661        package: &PackageName,
662        requirement: &VersionReq,
663    ) -> Result<Option<PackageDownload>, ClientError> {
664        let info = self.package(package).await?;
665
666        let registry_domain = self.get_warg_registry(package.namespace()).await?;
667
668        tracing::debug!(
669            package = package.as_ref(),
670            version_requirement = requirement.to_string(),
671            registry_header = ?registry_domain,
672            "downloading",
673        );
674
675        match info.state.find_latest_release(requirement) {
676            Some(release) => {
677                let digest = release
678                    .content()
679                    .context("invalid state: not yanked but missing content")?
680                    .clone();
681                let path = self
682                    .download_content(registry_domain.as_ref(), &digest)
683                    .await?;
684                Ok(Some(PackageDownload {
685                    version: release.version.clone(),
686                    digest,
687                    path,
688                }))
689            }
690            None => Ok(None),
691        }
692    }
693
694    /// Downloads the latest version of a package.
695    ///
696    /// If the requested package log is not present in client storage, it
697    /// will be fetched from the registry first.
698    ///
699    /// An error is returned if the package does not exist.
700    ///
701    /// If a version satisfying the requirement does not exist, `None` is
702    /// returned.
703    pub async fn download_as_stream(
704        &self,
705        package: &PackageName,
706        requirement: &VersionReq,
707    ) -> Result<Option<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>)>, ClientError> {
708        let info = self.package(package).await?;
709
710        let registry_domain = self.get_warg_registry(package.namespace()).await?;
711
712        tracing::debug!(
713            package = package.as_ref(),
714            version_requirement = requirement.to_string(),
715            registry_header = ?registry_domain,
716            "downloading",
717        );
718
719        match info.state.find_latest_release(requirement) {
720            Some(release) => {
721                let digest = release
722                    .content()
723                    .context("invalid state: not yanked but missing content")?
724                    .clone();
725                let stream = self
726                    .download_content_stream(registry_domain.as_ref(), &digest)
727                    .await?;
728                Ok(Some((
729                    PackageDownloadInfo {
730                        version: release.version.clone(),
731                        digest,
732                    },
733                    stream,
734                )))
735            }
736            None => Ok(None),
737        }
738    }
739
740    /// Downloads the specified version of a package into client storage.
741    ///
742    /// If the requested package log is not present in client storage, it
743    /// will be fetched from the registry first.
744    ///
745    /// An error is returned if the package or version does not exist.
746    ///
747    /// Returns the path within client storage of the package contents for
748    /// the specified version.
749    pub async fn download_exact(
750        &self,
751        package: &PackageName,
752        version: &Version,
753    ) -> Result<PackageDownload, ClientError> {
754        let info = self.package(package).await?;
755
756        let registry_domain = self.get_warg_registry(package.namespace()).await?;
757
758        tracing::debug!(
759            package = package.as_ref(),
760            version = version.to_string(),
761            registry_header = ?registry_domain,
762            "downloading exact version",
763        );
764
765        let release =
766            info.state
767                .release(version)
768                .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
769                    version: version.clone(),
770                    name: package.clone(),
771                })?;
772
773        let digest = release
774            .content()
775            .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
776                version: version.clone(),
777                name: package.clone(),
778            })?;
779
780        Ok(PackageDownload {
781            version: version.clone(),
782            digest: digest.clone(),
783            path: self
784                .download_content(registry_domain.as_ref(), digest)
785                .await?,
786        })
787    }
788
789    /// Downloads the specified version of a package.
790    ///
791    /// If the requested package log is not present in client storage, it
792    /// will be fetched from the registry first.
793    ///
794    /// An error is returned if the package or version does not exist.
795    pub async fn download_exact_as_stream(
796        &self,
797        package: &PackageName,
798        version: &Version,
799    ) -> Result<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>), ClientError> {
800        let info = self.package(package).await?;
801
802        let registry_domain = self.get_warg_registry(package.namespace()).await?;
803
804        tracing::debug!(
805            package = package.as_ref(),
806            version = version.to_string(),
807            registry_header = ?registry_domain,
808            "downloading exact version",
809        );
810
811        let release =
812            info.state
813                .release(version)
814                .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
815                    version: version.clone(),
816                    name: package.clone(),
817                })?;
818
819        let digest = release
820            .content()
821            .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
822                version: version.clone(),
823                name: package.clone(),
824            })?;
825
826        Ok((
827            PackageDownloadInfo {
828                version: version.clone(),
829                digest: digest.clone(),
830            },
831            self.download_content_stream(registry_domain.as_ref(), digest)
832                .await?,
833        ))
834    }
835
836    async fn update_packages_and_return_federated_packages<'a>(
837        &self,
838        registry_domain: Option<&RegistryDomain>,
839        packages: impl IntoIterator<Item = &'a mut PackageInfo>,
840    ) -> Result<IndexMap<Option<RegistryDomain>, Vec<&'a mut PackageInfo>>, ClientError> {
841        let ts_checkpoint = self.api.latest_checkpoint(registry_domain).await?;
842        let checkpoint = &ts_checkpoint.as_ref().checkpoint;
843
844        tracing::debug!(
845            log_length = checkpoint.log_length,
846            registry_header = ?registry_domain,
847            "updating to checkpoint",
848        );
849
850        // operator log info
851        let mut operator = self
852            .registry
853            .load_operator(registry_domain)
854            .await?
855            .unwrap_or_default();
856
857        // map package names to package logs that need to be updated
858        let mut packages = packages
859            .into_iter()
860            .filter_map(|p| match &p.checkpoint {
861                // Don't bother updating if the package is already at the specified checkpoint
862                // If `registry` field is not set, then update.
863                Some(c) if p.registry.is_some() && c == checkpoint => None,
864                _ => Some((LogId::package_log::<Sha256>(&p.name), p)),
865            })
866            .inspect(|(_, p)| tracing::info!("package `{name}` will be updated", name = p.name))
867            .collect::<IndexMap<_, _>>();
868
869        // if operator log and all packages are up to date at the latest checkpoint, then return
870        if operator.checkpoint.is_some_and(|c| &c == checkpoint) && packages.is_empty() {
871            return Ok(IndexMap::default());
872        }
873
874        // federated packages in other registries
875        let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
876            IndexMap::with_capacity(packages.len());
877
878        // loop and fetch logs
879        let has_auth_token = self.api.auth_token().is_some();
880        loop {
881            let response = match self
882                .api
883                .fetch_logs(
884                    registry_domain,
885                    FetchLogsRequest {
886                        log_length: checkpoint.log_length,
887                        operator: operator
888                            .head_fetch_token
889                            .as_ref()
890                            .map(|t| Cow::Borrowed(t.as_str())),
891                        limit: None,
892                        // last known fetch token for each package log ID
893                        packages: Cow::Owned(
894                            packages
895                                .iter()
896                                .map(|(id, p)| (id.clone(), p.head_fetch_token.clone()))
897                                .collect::<IndexMap<_, _>>(),
898                        ),
899                    },
900                )
901                .await
902                .inspect(|res| {
903                    for warning in res.warnings.iter() {
904                        tracing::warn!("Fetch warning from registry: {}", warning.message);
905                    }
906                }) {
907                Ok(res) => Ok(res),
908                Err(err) => match &err {
909                    api::ClientError::Fetch(FetchError::LogNotFound(log_id))
910                    | api::ClientError::Package(PackageError::LogNotFound(log_id)) => {
911                        if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
912                            Err(ClientError::PackageDoesNotExist {
913                                name,
914                                has_auth_token,
915                            })
916                        } else {
917                            Err(ClientError::Api(err))
918                        }
919                    }
920
921                    api::ClientError::LogNotFoundWithHint(log_id, hint)
922                        if self.disable_interactive =>
923                    {
924                        let name = packages.get(log_id).unwrap().name.clone();
925
926                        match hint.to_str().ok().map(|s| s.split_once('=')) {
927                            Some(Some((namespace, registry))) if packages.contains_key(log_id) => {
928                                Err(ClientError::PackageDoesNotExistWithHintHeader {
929                                    name,
930                                    has_auth_token,
931                                    hint_namespace: namespace.to_string(),
932                                    hint_registry: registry.to_string(),
933                                })
934                            }
935                            _ => Err(ClientError::PackageDoesNotExist {
936                                name,
937                                has_auth_token,
938                            }),
939                        }
940                    }
941
942                    #[cfg(feature = "cli-interactive")]
943                    api::ClientError::LogNotFoundWithHint(log_id, hint) => {
944                        match hint.to_str().ok().map(|s| s.split_once('=')) {
945                            Some(Some((namespace, registry)))
946                                if !self.ignore_federation_hints
947                                    && packages.contains_key(log_id) =>
948                            {
949                                use dialoguer::{theme::ColorfulTheme, Confirm};
950
951                                let package_name = &packages.get(log_id).unwrap().name;
952
953                                if !self.disable_auto_accept_federation_hints
954                                    || Confirm::with_theme(&ColorfulTheme::default())
955                                        .with_prompt(format!(
956"Package `{package_name}` is not in `{current_registry}` registry.
957Registry recommends using `{registry}` registry for packages in `{namespace}` namespace.
958Accept recommendation y/N\n",
959current_registry = registry_domain.map(|d| d.as_str()).unwrap_or(&self.url().safe_label()),
960))
961                                        .default(true)
962                                        .interact()
963                                        .unwrap()
964                                {
965                                    let federated_registry_domain =
966                                        Some(RegistryDomain::from_str(registry)?);
967                                    self.store_namespace(
968                                        namespace.to_string(),
969                                        federated_registry_domain.clone().unwrap(),
970                                    )
971                                    .await?;
972
973                                    // filter packages with namespace in other registry
974                                    packages = packages
975                                        .into_iter()
976                                        .filter_map(|(log_id, package_info)| {
977                                            if package_info.name.namespace() == namespace {
978                                                if let Some(package_set) = federated_packages
979                                                    .get_mut(&federated_registry_domain)
980                                                {
981                                                    package_set.push(package_info);
982                                                } else {
983                                                    federated_packages.insert(
984                                                        federated_registry_domain.clone(),
985                                                        vec![package_info],
986                                                    );
987                                                }
988
989                                                None
990                                            } else {
991                                                Some((log_id, package_info))
992                                            }
993                                        })
994                                        .collect();
995
996                                    // continue fetching logs from this registry
997                                    continue;
998                                } else {
999                                    Err(ClientError::PackageDoesNotExist {
1000                                        name: package_name.clone(),
1001                                        has_auth_token,
1002                                    })
1003                                }
1004                            }
1005                            _ => {
1006                                if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
1007                                    Err(ClientError::PackageDoesNotExist {
1008                                        name,
1009                                        has_auth_token,
1010                                    })
1011                                } else {
1012                                    Err(ClientError::Api(err))
1013                                }
1014                            }
1015                        }
1016                    }
1017                    _ => Err(ClientError::Api(err)),
1018                },
1019            }?;
1020
1021            for record in response.operator {
1022                let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
1023                    record.envelope.try_into()?;
1024
1025                // skip over records that has already seen
1026                if operator.head_registry_index.is_none()
1027                    || proto_envelope.registry_index > operator.head_registry_index.unwrap()
1028                {
1029                    operator.state = operator
1030                        .state
1031                        .validate(&proto_envelope.envelope)
1032                        .map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
1033                    operator.head_registry_index = Some(proto_envelope.registry_index);
1034                    operator.head_fetch_token = Some(record.fetch_token);
1035                }
1036            }
1037
1038            for (log_id, records) in response.packages {
1039                let package = packages.get_mut(&log_id).ok_or_else(|| {
1040                    anyhow!("received records for unknown package log `{log_id}`")
1041                })?;
1042
1043                for record in records {
1044                    let proto_envelope: PublishedProtoEnvelope<package::PackageRecord> =
1045                        record.envelope.try_into()?;
1046
1047                    // skip over records that has already seen
1048                    if package.head_registry_index.is_none()
1049                        || proto_envelope.registry_index > package.head_registry_index.unwrap()
1050                    {
1051                        let state = std::mem::take(&mut package.state);
1052                        package.state =
1053                            state.validate(&proto_envelope.envelope).map_err(|inner| {
1054                                ClientError::PackageValidationFailed {
1055                                    name: package.name.clone(),
1056                                    inner,
1057                                }
1058                            })?;
1059                        package.head_registry_index = Some(proto_envelope.registry_index);
1060                        package.head_fetch_token = Some(record.fetch_token);
1061                    }
1062                }
1063
1064                // At this point, the package log should not be empty
1065                if package.state.head().is_none() {
1066                    return Err(ClientError::PackageLogEmpty {
1067                        name: package.name.clone(),
1068                    });
1069                }
1070            }
1071
1072            if !response.more {
1073                break;
1074            }
1075        }
1076
1077        // verify checkpoint signature
1078        TimestampedCheckpoint::verify(
1079            operator
1080                .state
1081                .public_key(ts_checkpoint.to_owned().to_owned().key_id())
1082                .ok_or(ClientError::InvalidCheckpointKeyId {
1083                    key_id: ts_checkpoint.key_id().clone(),
1084                })?,
1085            &ts_checkpoint.as_ref().encode(),
1086            ts_checkpoint.signature(),
1087        )
1088        .or(Err(ClientError::InvalidCheckpointSignature))?;
1089
1090        // Prove inclusion for the current log heads
1091        let mut leaf_indices = Vec::with_capacity(packages.len() + 1 /* for operator */);
1092        let mut leafs = Vec::with_capacity(leaf_indices.len());
1093
1094        // operator record inclusion
1095        if let Some(index) = operator.head_registry_index {
1096            leaf_indices.push(index);
1097            leafs.push(LogLeaf {
1098                log_id: LogId::operator_log::<Sha256>(),
1099                record_id: operator.state.head().as_ref().unwrap().digest.clone(),
1100            });
1101        } else {
1102            return Err(ClientError::NoOperatorRecords);
1103        }
1104
1105        // package records inclusion
1106        for (log_id, package) in &packages {
1107            if let Some(index) = package.head_registry_index {
1108                leaf_indices.push(index);
1109                leafs.push(LogLeaf {
1110                    log_id: log_id.clone(),
1111                    record_id: package.state.head().as_ref().unwrap().digest.clone(),
1112                });
1113            } else {
1114                return Err(ClientError::PackageLogEmpty {
1115                    name: package.name.clone(),
1116                });
1117            }
1118        }
1119
1120        if !leafs.is_empty() {
1121            self.api
1122                .prove_inclusion(
1123                    registry_domain,
1124                    InclusionRequest {
1125                        log_length: checkpoint.log_length,
1126                        leafs: leaf_indices,
1127                    },
1128                    checkpoint,
1129                    &leafs,
1130                )
1131                .await?;
1132        }
1133
1134        if let Some(from) = self.registry.load_checkpoint(registry_domain).await? {
1135            let from_log_length = from.as_ref().checkpoint.log_length;
1136            let to_log_length = ts_checkpoint.as_ref().checkpoint.log_length;
1137
1138            match from_log_length.cmp(&to_log_length) {
1139                Ordering::Greater => {
1140                    return Err(ClientError::CheckpointLogLengthRewind {
1141                        from: from_log_length,
1142                        to: to_log_length,
1143                    });
1144                }
1145                Ordering::Less => {
1146                    self.api
1147                        .prove_log_consistency(
1148                            registry_domain,
1149                            ConsistencyRequest {
1150                                from: from_log_length,
1151                                to: to_log_length,
1152                            },
1153                            Cow::Borrowed(&from.as_ref().checkpoint.log_root),
1154                            Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
1155                        )
1156                        .await?
1157                }
1158                Ordering::Equal => {
1159                    if from.as_ref().checkpoint.log_root
1160                        != ts_checkpoint.as_ref().checkpoint.log_root
1161                        || from.as_ref().checkpoint.map_root
1162                            != ts_checkpoint.as_ref().checkpoint.map_root
1163                    {
1164                        return Err(ClientError::CheckpointChangedLogRootOrMapRoot {
1165                            log_length: from_log_length,
1166                        });
1167                    }
1168                }
1169            }
1170        }
1171
1172        operator.registry = registry_domain
1173            .cloned()
1174            .or_else(|| Some(self.url().registry_domain()));
1175        operator.checkpoint = Some(checkpoint.clone()); // updated to this checkpoint
1176        self.registry
1177            .store_operator(registry_domain, operator)
1178            .await?;
1179
1180        for package in packages.values_mut() {
1181            package.registry = registry_domain
1182                .cloned()
1183                .or_else(|| Some(self.url().registry_domain()));
1184            package.checkpoint = Some(checkpoint.clone()); // updated to this checkpoint
1185            self.registry
1186                .store_package(registry_domain, package)
1187                .await?;
1188        }
1189
1190        self.registry
1191            .store_checkpoint(registry_domain, &ts_checkpoint)
1192            .await?;
1193
1194        // return packages to be retrieved from other registries
1195        Ok(federated_packages)
1196    }
1197
1198    /// Update checkpoint for list of packages
1199    async fn update_checkpoints<'a>(
1200        &self,
1201        packages: impl IntoIterator<Item = &mut PackageInfo>,
1202    ) -> Result<(), ClientError> {
1203        // first collect the packages that we already have namespace mappings for
1204        let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
1205            IndexMap::new();
1206        for package in packages.into_iter() {
1207            let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
1208            if let Some(package_set) = federated_packages.get_mut(&registry_domain) {
1209                package_set.push(package);
1210            } else {
1211                federated_packages.insert(registry_domain, vec![package]);
1212            }
1213        }
1214
1215        while let Some((registry_domain, packages)) = federated_packages.pop() {
1216            for (registry_domain, packages) in self
1217                .update_packages_and_return_federated_packages(registry_domain.as_ref(), packages)
1218                .await?
1219                .into_iter()
1220            {
1221                if let Some(package_set) = federated_packages.get_mut(&registry_domain) {
1222                    package_set.extend(packages);
1223                } else {
1224                    federated_packages.insert(registry_domain, packages);
1225                }
1226            }
1227        }
1228
1229        Ok(())
1230    }
1231
1232    /// Fetches package logs without checking local storage first.
1233    pub async fn fetch_packages(
1234        &self,
1235        names: impl IntoIterator<Item = &PackageName>,
1236    ) -> Result<Vec<PackageInfo>, ClientError> {
1237        let mut packages: Vec<PackageInfo> = names
1238            .into_iter()
1239            .map(|name| PackageInfo::new(name.clone()))
1240            .collect();
1241        self.update_checkpoints(packages.iter_mut()).await?;
1242        Ok(packages)
1243    }
1244
1245    /// Fetches the `PackageInfo` without checking local storage first.
1246    pub async fn fetch_package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1247        let mut info = PackageInfo::new(name.clone());
1248        self.update_checkpoints([&mut info]).await?;
1249        Ok(info)
1250    }
1251
1252    /// Retrieves the `PackageInfo` from local storage, if present, otherwise fetches from the
1253    /// registry.
1254    pub async fn package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1255        let registry_domain = self.get_warg_registry(name.namespace()).await?;
1256        match self
1257            .registry
1258            .load_package(registry_domain.as_ref(), name)
1259            .await?
1260        {
1261            Some(mut info) => {
1262                tracing::info!("log for package `{name}` already exists in storage");
1263                if info.registry.is_none() {
1264                    info.registry = registry_domain
1265                        .clone()
1266                        .or_else(|| Some(self.url().registry_domain()));
1267                }
1268                Ok(info)
1269            }
1270            None => {
1271                let mut info = PackageInfo::new(name.clone());
1272                self.update_checkpoints([&mut info]).await?;
1273                Ok(info)
1274            }
1275        }
1276    }
1277
1278    async fn get_package_record(
1279        &self,
1280        registry_domain: Option<&RegistryDomain>,
1281        package: &PackageName,
1282        log_id: &LogId,
1283        record_id: &RecordId,
1284    ) -> ClientResult<PackageRecord> {
1285        let record = self
1286            .api
1287            .get_package_record(registry_domain, log_id, record_id)
1288            .await
1289            .map_err(|e| match e {
1290                api::ClientError::Package(PackageError::Rejection(reason)) => {
1291                    ClientError::PublishRejected {
1292                        name: package.clone(),
1293                        reason,
1294                        record_id: record_id.clone(),
1295                    }
1296                }
1297                e => {
1298                    ClientError::translate_log_not_found(e, self.api.auth_token().is_some(), |id| {
1299                        if id == log_id {
1300                            Some(package.clone())
1301                        } else {
1302                            None
1303                        }
1304                    })
1305                }
1306            })?;
1307        Ok(record)
1308    }
1309
1310    /// Downloads the content for the specified digest into client storage.
1311    ///
1312    /// If the content already exists in client storage, the existing path
1313    /// is returned.
1314    async fn download_content(
1315        &self,
1316        registry_domain: Option<&RegistryDomain>,
1317        digest: &AnyHash,
1318    ) -> Result<PathBuf, ClientError> {
1319        match self.content.content_location(digest) {
1320            Some(path) => {
1321                tracing::info!("content for digest `{digest}` already exists in storage");
1322                Ok(path)
1323            }
1324            None => {
1325                self.content
1326                    .store_content(
1327                        Box::pin(self.api.download_content(registry_domain, digest).await?),
1328                        Some(digest),
1329                    )
1330                    .await?;
1331
1332                self.content
1333                    .content_location(digest)
1334                    .ok_or_else(|| ClientError::ContentNotFound {
1335                        digest: digest.clone(),
1336                    })
1337            }
1338        }
1339    }
1340
1341    /// Downloads the content for the specified digest as a stream.
1342    ///
1343    /// If the content already exists in client storage, it is read from the client storage.
1344    ///
1345    /// The download is not stored in client storage.
1346    async fn download_content_stream(
1347        &self,
1348        registry_domain: Option<&RegistryDomain>,
1349        digest: &AnyHash,
1350    ) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
1351        match self.content.content_location(digest) {
1352            Some(path) => {
1353                tracing::info!("content for digest `{digest}` already exists in storage");
1354                let file = tokio::fs::File::open(path)
1355                    .await
1356                    .map_err(ClientError::IoError)?;
1357                Ok(ReaderStream::new(file).map_err(Into::into).boxed())
1358            }
1359            None => Ok(Box::pin(
1360                self.api.download_content(registry_domain, digest).await?,
1361            )),
1362        }
1363    }
1364}
1365/// A Warg registry client that uses the local file system to store
1366/// package logs and content.
1367pub type FileSystemClient =
1368    Client<FileSystemRegistryStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage>;
1369
1370/// A result of an attempt to lock client storage.
1371pub enum StorageLockResult<T> {
1372    /// The storage lock was acquired.
1373    Acquired(T),
1374    /// The storage lock was not acquired for the specified directory.
1375    NotAcquired(PathBuf),
1376}
1377
1378impl FileSystemClient {
1379    async fn storage_paths(
1380        url: Option<&str>,
1381        config: &Config,
1382        disable_interactive: bool,
1383    ) -> Result<StoragePaths, ClientError> {
1384        let checking_url_for_well_known = RegistryUrl::new(
1385            url.or(config.home_url.as_deref())
1386                .unwrap_or(DEFAULT_REGISTRY),
1387        )?;
1388
1389        let url = if let Some(warg_url) =
1390            api::Client::new(checking_url_for_well_known.to_string(), None)?
1391                .well_known_config()
1392                .await?
1393        {
1394            if !disable_interactive && warg_url != checking_url_for_well_known {
1395                println!(
1396                    "Resolved `{well_known}` to registry hosted on `{registry}`",
1397                    well_known = checking_url_for_well_known.registry_domain(),
1398                    registry = warg_url.registry_domain(),
1399                );
1400            }
1401            warg_url
1402        } else {
1403            RegistryUrl::new(
1404                url.or(config.home_url.as_deref())
1405                    .ok_or(ClientError::NoHomeRegistryUrl)?,
1406            )?
1407        };
1408
1409        config.storage_paths_for_url(url)
1410    }
1411
1412    /// Attempts to create a client for the given registry URL.
1413    ///
1414    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1415    /// URL, an error is returned.
1416    ///
1417    /// If a lock cannot be acquired for a storage directory, then
1418    /// `NewClientResult::Blocked` is returned with the path to the
1419    /// directory that could not be locked.
1420    pub async fn try_new_with_config(
1421        registry: Option<&str>,
1422        config: &Config,
1423        mut auth_token: Option<Secret<String>>,
1424    ) -> Result<StorageLockResult<Self>, ClientError> {
1425        let disable_interactive =
1426            cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1427
1428        let StoragePaths {
1429            registry_url: url,
1430            registries_dir,
1431            content_dir,
1432            namespace_map_path,
1433        } = Self::storage_paths(registry, config, disable_interactive).await?;
1434
1435        let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1436            (config.keyring_backend.clone(), config.keys.clone())
1437        } else {
1438            (None, IndexSet::new())
1439        };
1440
1441        #[cfg(feature = "keyring")]
1442        if auth_token.is_none() && config.keyring_auth {
1443            auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1444        }
1445
1446        let (packages, content, namespace_map) = match (
1447            FileSystemRegistryStorage::try_lock(registries_dir.clone())?,
1448            FileSystemContentStorage::try_lock(content_dir.clone())?,
1449            FileSystemNamespaceMapStorage::new(namespace_map_path.clone()),
1450        ) {
1451            (Some(packages), Some(content), namespace_map) => (packages, content, namespace_map),
1452            (None, _, _) => return Ok(StorageLockResult::NotAcquired(registries_dir)),
1453            (_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)),
1454        };
1455
1456        Ok(StorageLockResult::Acquired(Self::new(
1457            url.into_url(),
1458            packages,
1459            content,
1460            namespace_map,
1461            auth_token,
1462            config.ignore_federation_hints,
1463            config.disable_auto_accept_federation_hints,
1464            config.disable_auto_package_init,
1465            disable_interactive,
1466            keyring_backend,
1467            keys,
1468        )?))
1469    }
1470
1471    /// Attempts to create a client for the given registry URL.
1472    ///
1473    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1474    /// URL, an error is returned.
1475    ///
1476    /// If a lock cannot be acquired for a storage directory, then
1477    /// `NewClientResult::Blocked` is returned with the path to the
1478    /// directory that could not be locked.
1479    ///
1480    /// Same as calling `try_new_with_config` with
1481    /// `Config::from_default_file()?.unwrap_or_default()`.
1482    pub async fn try_new_with_default_config(
1483        url: Option<&str>,
1484    ) -> Result<StorageLockResult<Self>, ClientError> {
1485        Self::try_new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None)
1486            .await
1487    }
1488
1489    /// Creates a client for the given registry URL.
1490    ///
1491    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1492    /// URL, an error is returned.
1493    ///
1494    /// This method blocks if storage locks cannot be acquired.
1495    pub async fn new_with_config(
1496        registry: Option<&str>,
1497        config: &Config,
1498        mut auth_token: Option<Secret<String>>,
1499    ) -> Result<Self, ClientError> {
1500        let disable_interactive =
1501            cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1502
1503        let StoragePaths {
1504            registry_url: url,
1505            registries_dir,
1506            content_dir,
1507            namespace_map_path,
1508        } = Self::storage_paths(registry, config, disable_interactive).await?;
1509
1510        let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1511            (config.keyring_backend.clone(), config.keys.clone())
1512        } else {
1513            (None, IndexSet::new())
1514        };
1515
1516        #[cfg(feature = "keyring")]
1517        if auth_token.is_none() && config.keyring_auth {
1518            auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1519        }
1520
1521        Self::new(
1522            url.into_url(),
1523            FileSystemRegistryStorage::lock(registries_dir)?,
1524            FileSystemContentStorage::lock(content_dir)?,
1525            FileSystemNamespaceMapStorage::new(namespace_map_path),
1526            auth_token,
1527            config.ignore_federation_hints,
1528            config.disable_auto_accept_federation_hints,
1529            config.disable_auto_package_init,
1530            disable_interactive,
1531            keyring_backend,
1532            keys,
1533        )
1534    }
1535
1536    /// Creates a client for the given registry URL.
1537    ///
1538    /// If the URL is `None`, the home registry URL is used; if there is no home registry
1539    /// URL, an error is returned.
1540    ///
1541    /// This method blocks if storage locks cannot be acquired.
1542    ///
1543    /// Same as calling `new_with_config` with
1544    /// `Config::from_default_file()?.unwrap_or_default()`.
1545    pub async fn new_with_default_config(url: Option<&str>) -> Result<Self, ClientError> {
1546        Self::new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None).await
1547    }
1548}
1549
1550/// Represents information about a downloaded package.
1551#[derive(Debug, Clone)]
1552pub struct PackageDownload {
1553    /// The package version that was downloaded.
1554    pub version: Version,
1555    /// The digest of the package contents.
1556    pub digest: AnyHash,
1557    /// The path to the downloaded package contents.
1558    pub path: PathBuf,
1559}
1560
1561/// Represents information about a downloaded package.
1562pub struct PackageDownloadInfo {
1563    /// The package version that was downloaded.
1564    pub version: Version,
1565    /// The digest of the package contents.
1566    pub digest: AnyHash,
1567}
1568
1569/// Represents an error returned by Warg registry clients.
1570#[derive(Debug, Error)]
1571pub enum ClientError {
1572    /// No home registry registry server URL is configured.
1573    #[error("no home registry registry server URL is configured")]
1574    NoHomeRegistryUrl,
1575
1576    /// Reset registry local state.
1577    #[error("reset registry state failed")]
1578    ResettingRegistryLocalStateFailed,
1579
1580    /// Clearing content local cache.
1581    #[error("clear content cache failed")]
1582    ClearContentCacheFailed,
1583
1584    /// Unauthorized rejection
1585    #[error("unauthorized: {0}")]
1586    Unauthorized(String),
1587
1588    /// Checkpoint signature failed verification
1589    #[error("invalid checkpoint signature")]
1590    InvalidCheckpointSignature,
1591
1592    /// Checkpoint signature failed verification
1593    #[error("invalid checkpoint key ID `{key_id}`")]
1594    InvalidCheckpointKeyId {
1595        /// The signature key ID.
1596        key_id: signing::KeyID,
1597    },
1598
1599    /// The server did not provide operator records.
1600    #[error("the server did not provide any operator records")]
1601    NoOperatorRecords,
1602
1603    /// The operator failed validation.
1604    #[error("operator failed validation: {inner}")]
1605    OperatorValidationFailed {
1606        /// The validation error.
1607        inner: operator::ValidationError,
1608    },
1609
1610    /// The package already exists and cannot be initialized.
1611    #[error("package `{name}` already exists and cannot be initialized")]
1612    CannotInitializePackage {
1613        /// The package name that already exists.
1614        name: PackageName,
1615        /// The record identifier for the init record.
1616        init_record_id: Option<RecordId>,
1617    },
1618
1619    /// The package must be initialized before publishing.
1620    #[error("package `{name}` must be initialized before publishing")]
1621    MustInitializePackage {
1622        /// The name of the package that must be initialized.
1623        name: PackageName,
1624        /// Client has authentication credentials.
1625        has_auth_token: bool,
1626    },
1627
1628    /// There is no publish operation in progress.
1629    #[error("there is no publish operation in progress")]
1630    NotPublishing,
1631
1632    /// The package has no records to publish.
1633    #[error("package `{name}` has no records to publish")]
1634    NothingToPublish {
1635        /// The package that has no publish operations.
1636        name: PackageName,
1637    },
1638
1639    /// The package does not exist.
1640    #[error("package `{name}` does not exist")]
1641    PackageDoesNotExist {
1642        /// The missing package.
1643        name: PackageName,
1644        /// Client has authentication credentials.
1645        has_auth_token: bool,
1646    },
1647
1648    /// The package does not exist with hint header.
1649    #[error("package `{name}` does not exist but the registry suggests checking registry `{hint_registry}` for packages in namespace `{hint_namespace}`")]
1650    PackageDoesNotExistWithHintHeader {
1651        /// The missing package.
1652        name: PackageName,
1653        /// Client has authentication credentials.
1654        has_auth_token: bool,
1655        /// The hint namespace.
1656        hint_namespace: String,
1657        /// The hint registry.
1658        hint_registry: String,
1659    },
1660
1661    /// The package version does not exist.
1662    #[error("version `{version}` of package `{name}` does not exist")]
1663    PackageVersionDoesNotExist {
1664        /// The missing version of the package.
1665        version: Version,
1666        /// The package with the missing version.
1667        name: PackageName,
1668    },
1669
1670    /// The package version requirement does not exist.
1671    #[error("version that satisfies requirement `{version}` was not found for package `{name}`")]
1672    PackageVersionRequirementDoesNotExist {
1673        /// The missing version requirement of the package.
1674        version: VersionReq,
1675        /// The package with the missing version.
1676        name: PackageName,
1677    },
1678
1679    /// The package failed validation.
1680    #[error("package `{name}` failed validation: {inner}")]
1681    PackageValidationFailed {
1682        /// The package that failed validation.
1683        name: PackageName,
1684        /// The validation error.
1685        inner: package::ValidationError,
1686    },
1687
1688    /// Content was not found during a publish operation.
1689    #[error("content with digest `{digest}` was not found in client storage")]
1690    ContentNotFound {
1691        /// The digest of the missing content.
1692        digest: AnyHash,
1693    },
1694
1695    /// Content digest was different than expected.
1696    #[error("content with digest `{digest}` was not found expected `{expected}`")]
1697    IncorrectContent {
1698        /// The digest of the missing content.
1699        digest: AnyHash,
1700        /// The expected
1701        expected: AnyHash,
1702    },
1703
1704    /// The package log is empty and cannot be validated.
1705    #[error("package log is empty and cannot be validated")]
1706    PackageLogEmpty {
1707        /// The package with an empty package log.
1708        name: PackageName,
1709    },
1710
1711    /// A publish operation was rejected.
1712    #[error("the publishing of package `{name}` was rejected due to: {reason}")]
1713    PublishRejected {
1714        /// The package that was rejected.
1715        name: PackageName,
1716        /// The record identifier for the record that was rejected.
1717        record_id: RecordId,
1718        /// The reason it was rejected.
1719        reason: String,
1720    },
1721
1722    /// A publish operation was rejected due to conflicting pending publish.
1723    #[error("the publishing of package `{name}` was rejected due to conflicting pending publish of record `{pending_record_id}`")]
1724    ConflictPendingPublish {
1725        /// The package that was rejected.
1726        name: PackageName,
1727        /// The record identifier for the record that was rejected.
1728        record_id: RecordId,
1729        /// The record identifier for the pending publish record.
1730        pending_record_id: RecordId,
1731    },
1732
1733    /// The package is still missing content.
1734    #[error("the package is still missing content after all content was uploaded")]
1735    PackageMissingContent,
1736
1737    /// The registry provided a latest checkpoint with a log length less than a previously provided
1738    /// checkpoint log length.
1739    #[error("registry rewinded checkpoints; latest checkpoint log length `{to}` is less than previously received checkpoint log length `{from}`")]
1740    CheckpointLogLengthRewind {
1741        /// The previously received checkpoint log length.
1742        from: RegistryLen,
1743        /// The latest checkpoint log length.
1744        to: RegistryLen,
1745    },
1746
1747    /// The registry provided a checkpoint with a different `log_root` and
1748    /// `map_root` than a previously provided checkpoint.
1749    #[error("registry provided a new checkpoint with the same log length `{log_length}` as previously fetched but different log root or map root")]
1750    CheckpointChangedLogRootOrMapRoot {
1751        /// The checkpoint log length.
1752        log_length: RegistryLen,
1753    },
1754
1755    /// An error occurred while accessing the keyring.
1756    #[error(transparent)]
1757    Keyring(#[from] crate::keyring::KeyringError),
1758
1759    /// An error occurred during an API operation.
1760    #[error(transparent)]
1761    Api(#[from] api::ClientError),
1762
1763    /// An error occurred while performing a client operation.
1764    #[error("{0:?}")]
1765    Other(#[from] anyhow::Error),
1766
1767    /// An error occurred while performing a IO.
1768    #[error("error: {0:?}")]
1769    IoError(#[from] std::io::Error),
1770}
1771
1772impl ClientError {
1773    fn translate_log_not_found(
1774        e: api::ClientError,
1775        has_auth_token: bool,
1776        lookup: impl Fn(&LogId) -> Option<PackageName>,
1777    ) -> Self {
1778        match &e {
1779            api::ClientError::Fetch(FetchError::LogNotFound(id))
1780            | api::ClientError::Package(PackageError::LogNotFound(id)) => {
1781                if let Some(name) = lookup(id) {
1782                    return Self::PackageDoesNotExist {
1783                        name,
1784                        has_auth_token,
1785                    };
1786                }
1787            }
1788            _ => {}
1789        }
1790
1791        Self::Api(e)
1792    }
1793}
1794
1795/// Represents the result of a client operation.
1796pub type ClientResult<T> = Result<T, ClientError>;