1#![deny(missing_docs)]
4use crate::storage::PackageInfo;
5
6use anyhow::{anyhow, Context, Result};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt, TryStreamExt};
9use indexmap::{IndexMap, IndexSet};
10use reqwest::{Body, IntoUrl};
11use secrecy::Secret;
12use semver::{Version, VersionReq};
13use std::cmp::Ordering;
14use std::fs;
15use std::str::FromStr;
16use std::{borrow::Cow, path::PathBuf, time::Duration};
17use storage::{
18 ContentStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage,
19 FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
20};
21use thiserror::Error;
22use tokio_util::io::ReaderStream;
23use warg_api::v1::{
24 fetch::{FetchError, FetchLogsRequest},
25 package::{
26 MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27 UploadEndpoint,
28 },
29 proof::{ConsistencyRequest, InclusionRequest},
30};
31use warg_crypto::hash::Sha256;
32use warg_crypto::{hash::AnyHash, signing, Encode, Signable};
33use warg_protocol::package::ReleaseState;
34use warg_protocol::{
35 operator, package,
36 registry::{LogId, LogLeaf, PackageName, RecordId, RegistryLen, TimestampedCheckpoint},
37 PublishedProtoEnvelope,
38};
39use wasm_compose::graph::{CompositionGraph, EncodeOptions, ExportIndex, InstanceId};
40
41#[cfg(feature = "keyring")]
42pub mod keyring;
43
44pub mod api;
45mod config;
46pub mod depsolve;
48use depsolve::{Bundler, LockListBuilder};
49pub mod version_util;
51use version_util::{kindless_name, locked_package, versioned_package, Import, ImportKind};
52pub mod lock;
53mod registry_url;
54pub mod storage;
55pub use self::config::*;
56pub use self::registry_url::RegistryUrl;
57
58const DEFAULT_WAIT_INTERVAL: Duration = Duration::from_secs(1);
59
60pub const DEFAULT_REGISTRY: &str = "bytecodealliance.org";
63
64pub struct Client<R, C, N>
66where
67 R: RegistryStorage,
68 C: ContentStorage,
69 N: NamespaceMapStorage,
70{
71 registry: R,
72 content: C,
73 namespace_map: N,
74 api: api::Client,
75 ignore_federation_hints: bool,
76 disable_auto_accept_federation_hints: bool,
77 disable_auto_package_init: bool,
78 disable_interactive: bool,
79 keyring_backend: Option<String>,
80 keys: IndexSet<String>,
81}
82
83impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C, N> {
84 #[allow(clippy::too_many_arguments)]
87 pub fn new(
88 url: impl IntoUrl,
89 registry: R,
90 content: C,
91 namespace_map: N,
92 auth_token: Option<Secret<String>>,
93 ignore_federation_hints: bool,
94 disable_auto_accept_federation_hints: bool,
95 disable_auto_package_init: bool,
96 disable_interactive: bool,
97 keyring_backend: Option<String>,
98 keys: IndexSet<String>,
99 ) -> ClientResult<Self> {
100 let api = api::Client::new(url, auth_token)?;
101 Ok(Self {
102 registry,
103 content,
104 namespace_map,
105 api,
106 ignore_federation_hints,
107 disable_auto_accept_federation_hints,
108 disable_auto_package_init,
109 disable_interactive,
110 keyring_backend,
111 keys,
112 })
113 }
114
115 pub fn url(&self) -> &RegistryUrl {
117 self.api.url()
118 }
119
120 pub fn registry(&self) -> &R {
122 &self.registry
123 }
124
125 pub fn content(&self) -> &C {
127 &self.content
128 }
129
130 pub fn namespace_map(&self) -> &N {
132 &self.namespace_map
133 }
134
135 pub async fn get_warg_registry(
137 &self,
138 namespace: &str,
139 ) -> Result<Option<RegistryDomain>, ClientError> {
140 let operator = self
141 .registry()
142 .load_operator(Some(&RegistryDomain::from_str(namespace)?))
143 .await?;
144 if let Some(op) = operator {
145 match op.state.namespace_state(namespace) {
146 Some(warg_protocol::operator::NamespaceState::Imported { registry }) => {
147 return Ok(Some(RegistryDomain::from_str(registry)?));
148 }
149 Some(warg_protocol::operator::NamespaceState::Defined) => {
150 return Ok(None);
151 }
152 _ => (),
153 }
154 };
155 let nm_map = self.namespace_map.load_namespace_map().await?;
156 Ok(nm_map.and_then(|nm_map| {
157 nm_map
158 .get(namespace)
159 .map(|domain| RegistryDomain::from_str(domain).unwrap())
160 }))
161 }
162
163 pub async fn store_namespace(
165 &self,
166 namespace: String,
167 registry_domain: RegistryDomain,
168 ) -> Result<()> {
169 self.namespace_map
170 .store_namespace(namespace, registry_domain)
171 .await?;
172 Ok(())
173 }
174
175 pub async fn reset_namespaces(&self) -> Result<()> {
177 self.namespace_map.reset_namespaces().await?;
178 Ok(())
179 }
180
181 pub async fn reset_registry(&self) -> ClientResult<()> {
183 tracing::info!("resetting registry local state");
184 self.registry
185 .reset(true)
186 .await
187 .or(Err(ClientError::ResettingRegistryLocalStateFailed))
188 }
189
190 pub async fn clear_content_cache(&self) -> ClientResult<()> {
192 tracing::info!("removing content cache");
193 self.content
194 .clear()
195 .await
196 .or(Err(ClientError::ClearContentCacheFailed))
197 }
198
199 pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
201 let mut builder = LockListBuilder::default();
202 builder.build_list(self, info).await?;
203 let top = Import {
204 name: format!("{}:{}", info.name.namespace(), info.name.name()),
205 req: VersionReq::STAR,
206 kind: ImportKind::Unlocked,
207 };
208 builder.lock_list.insert(top);
209 let mut composer = CompositionGraph::new();
210 let mut handled = IndexMap::<String, InstanceId>::new();
211 for package in builder.lock_list {
212 let name = package.name.clone();
213 let version = package.req;
214 let id = PackageName::new(name)?;
215 let info = self
216 .registry()
217 .load_package(self.get_warg_registry(id.namespace()).await?.as_ref(), &id)
218 .await?;
219 if let Some(inf) = info {
220 let release = if version != VersionReq::STAR {
221 inf.state
222 .releases()
223 .filter(|r| version.matches(&r.version))
224 .last()
225 } else {
226 inf.state.releases().last()
227 };
228
229 if let Some(r) = release {
230 let state = &r.state;
231 if let ReleaseState::Released { content } = state {
232 let locked_package = locked_package(&package.name, r, content);
233 let path = self.content().content_location(content);
234 if let Some(p) = path {
235 let bytes = fs::read(&p).map_err(|_| ClientError::ContentNotFound {
236 digest: content.clone(),
237 })?;
238
239 let read_digest =
240 AnyHash::from_str(&format!("sha256:{}", sha256::digest(bytes)))
241 .unwrap();
242 if content != &read_digest {
243 return Err(ClientError::IncorrectContent {
244 digest: read_digest,
245 expected: content.clone(),
246 });
247 }
248 let component =
249 wasm_compose::graph::Component::from_file(&locked_package, p)?;
250 let component_id = if let Some((id, _)) =
251 composer.get_component_by_name(&locked_package)
252 {
253 id
254 } else {
255 composer.add_component(component)?
256 };
257 let instance_id = composer.instantiate(component_id)?;
258 let added = composer.get_component(component_id);
259 handled.insert(versioned_package(&package.name, version), instance_id);
260 let mut args = Vec::new();
261 if let Some(added) = added {
262 for (index, name, _) in added.imports() {
263 let iid = handled.get(kindless_name(name));
264 if let Some(arg) = iid {
265 args.push((arg, index));
266 }
267 }
268 }
269 for arg in args {
270 composer.connect(
271 *arg.0,
272 None::<ExportIndex>,
273 instance_id,
274 arg.1,
275 )?;
276 }
277 }
278 }
279 }
280 }
281 }
282 let final_name = &format!("{}:{}", info.name.namespace(), &info.name.name());
283 let id = handled.get(final_name);
284 let options = EncodeOptions {
285 export: id.copied(),
286 ..Default::default()
287 };
288 let locked = composer.encode(options)?;
289 fs::write("./locked.wasm", locked.as_slice()).map_err(|e| ClientError::Other(e.into()))?;
290 Ok(locked)
291 }
292
293 pub async fn bundle_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
295 let mut bundler = Bundler::new(self);
296 let path = PathBuf::from("./locked.wasm");
297 let locked = if !path.is_file() {
298 self.lock_component(info).await?
299 } else {
300 fs::read("./locked.wasm").map_err(|e| ClientError::Other(e.into()))?
301 };
302 let bundled = bundler.parse(&locked).await?;
303 fs::write("./bundled.wasm", bundled.as_slice())
304 .map_err(|e| ClientError::Other(e.into()))?;
305 Ok(bundled.as_slice().to_vec())
306 }
307
308 pub async fn publish(&self, signing_key: &signing::PrivateKey) -> ClientResult<RecordId> {
316 let info = self
317 .registry
318 .load_publish()
319 .await?
320 .ok_or(ClientError::NotPublishing)?;
321
322 let res = self.publish_with_info(signing_key, info).await;
323 self.registry.store_publish(None).await?;
324 res
325 }
326
327 #[cfg(feature = "keyring")]
336 pub async fn sign_with_keyring_and_publish(
337 &self,
338 publish_info: Option<PublishInfo>,
339 ) -> ClientResult<RecordId> {
340 let publish_info = if let Some(publish_info) = publish_info {
341 publish_info
342 } else {
343 self.registry
344 .load_publish()
345 .await?
346 .ok_or(ClientError::NotPublishing)?
347 };
348
349 let registry_domain = self
350 .get_warg_registry(publish_info.name.namespace())
351 .await?;
352 let signing_key = keyring::Keyring::new(
353 self.keyring_backend
354 .as_deref()
355 .unwrap_or(keyring::Keyring::DEFAULT_BACKEND),
356 )?
357 .get_signing_key(
358 registry_domain.map(|domain| domain.to_string()).as_deref(),
359 &self.keys,
360 Some(&self.url().to_string()),
361 )?;
362
363 let res = self.publish_with_info(&signing_key, publish_info).await;
364 self.registry.store_publish(None).await?;
365 res
366 }
367
368 pub async fn publish_with_info(
376 &self,
377 signing_key: &signing::PrivateKey,
378 publish_info: PublishInfo,
379 ) -> ClientResult<RecordId> {
380 if publish_info.entries.is_empty() {
381 return Err(ClientError::NothingToPublish {
382 name: publish_info.name.clone(),
383 });
384 }
385
386 tracing::info!(
387 "publishing {new}package `{name}`",
388 name = publish_info.name,
389 new = if publish_info.initializing() {
390 "new "
391 } else {
392 ""
393 }
394 );
395 tracing::debug!("entries: {:?}", publish_info.entries);
396
397 let mut accepted_prompt_to_initialize = false;
398
399 let mut init_record_id: Option<RecordId> = None;
400
401 let (package, record) = loop {
402 let mut info = publish_info.clone();
403
404 let mut initializing = info.initializing();
405
406 let package = match self.fetch_package(&info.name).await {
407 Ok(package) => {
408 if initializing {
409 return Err(ClientError::CannotInitializePackage {
410 name: package.name,
411 init_record_id,
412 });
413 } else if info.head.is_none() {
414 info.head = package.state.head().as_ref().map(|h| h.digest.clone());
417 }
418 package
419 }
420 Err(ClientError::PackageDoesNotExist {
421 name,
422 has_auth_token,
423 }) => {
424 if !initializing {
425 if !self.disable_auto_package_init {
426 info.entries.insert(0, crate::storage::PublishEntry::Init);
427 initializing = true;
428 accepted_prompt_to_initialize = true;
429 } else {
430 if self.disable_interactive || cfg!(not(feature = "cli-interactive")) {
431 return Err(ClientError::MustInitializePackage {
432 name,
433 has_auth_token,
434 });
435 }
436
437 #[cfg(feature = "cli-interactive")]
438 {
439 use crate::storage::PublishEntry;
440 use dialoguer::{theme::ColorfulTheme, Confirm};
441
442 if accepted_prompt_to_initialize
443 || Confirm::with_theme(&ColorfulTheme::default())
444 .with_prompt(format!(
445 "Package `{package_name}` was not found.
446 If it exists, you may not have access.
447 Attempt to create `{package_name}` and publish the release y/N\n",
448 package_name = &info.name,
449 ))
450 .default(false)
451 .interact()
452 .unwrap()
453 {
454 info.entries.insert(0, PublishEntry::Init);
455 initializing = true;
456 accepted_prompt_to_initialize = true;
457 } else {
458 return Err(ClientError::MustInitializePackage {
459 name,
460 has_auth_token,
461 });
462 }
463 }
464 }
465 }
466 PackageInfo::new(info.name.clone())
467 }
468 err => err?,
469 };
470 let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
471
472 let log_id = LogId::package_log::<Sha256>(&package.name);
473 let record = info.finalize(signing_key)?;
474 let record_id = RecordId::package_record::<Sha256>(&record);
475 let record = match self
476 .api
477 .publish_package_record(
478 registry_domain.as_ref(),
479 &log_id,
480 PublishRecordRequest {
481 package_name: Cow::Borrowed(&package.name),
482 record: Cow::Owned(record.into()),
483 content_sources: Default::default(),
484 },
485 )
486 .await
487 {
488 Ok(record) => Ok(record),
489 Err(api::ClientError::Package(PackageError::Rejection(reason))) => {
490 Err(ClientError::PublishRejected {
491 name: package.name.clone(),
492 reason,
493 record_id,
494 })
495 }
496 Err(api::ClientError::Package(PackageError::Unauthorized(reason))) => {
497 Err(ClientError::Unauthorized(reason))
498 }
499 Err(api::ClientError::Package(PackageError::ConflictPendingPublish(
500 pending_record_id,
501 ))) => {
502 tracing::info!("waiting for conflicting publish to complete");
504 if initializing {
506 match self.fetch_package(&package.name).await {
507 Ok(_) => {}
508 Err(ClientError::PackageDoesNotExist { .. }) => {}
510 Err(err) => return Err(err),
511 }
512 init_record_id = Some(pending_record_id.clone());
513 }
514 self.wait_for_publish(&package.name, &pending_record_id, DEFAULT_WAIT_INTERVAL)
515 .await
516 .map_err(|err| match err {
517 ClientError::PackageMissingContent => {
518 ClientError::ConflictPendingPublish {
519 name: package.name.clone(),
520 record_id,
521 pending_record_id,
522 }
523 }
524 err => err,
525 })?;
526
527 continue;
528 }
529 Err(e) => Err(ClientError::translate_log_not_found(
530 e,
531 self.api.auth_token().is_some(),
532 |id| {
533 if id == &log_id {
534 Some(package.name.clone())
535 } else {
536 None
537 }
538 },
539 )),
540 }?;
541
542 break (package, record);
543 };
544
545 for (digest, MissingContent { upload }) in record.missing_content() {
547 let Some(UploadEndpoint::Http {
549 method,
550 url,
551 headers,
552 }) = upload.first()
553 else {
554 continue;
555 };
556
557 self.api
558 .upload_content(
559 method,
560 url,
561 headers,
562 Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
563 || ClientError::ContentNotFound {
564 digest: digest.clone(),
565 },
566 )?),
567 )
568 .await
569 .map_err(|e| match e {
570 api::ClientError::Package(PackageError::Rejection(reason)) => {
571 ClientError::PublishRejected {
572 name: package.name.clone(),
573 record_id: record.record_id.clone(),
574 reason,
575 }
576 }
577 api::ClientError::Package(PackageError::Unauthorized(reason)) => {
578 ClientError::Unauthorized(reason)
579 }
580 _ => e.into(),
581 })?;
582 }
583
584 Ok(record.record_id)
585 }
586
587 pub async fn wait_for_publish(
593 &self,
594 package: &PackageName,
595 record_id: &RecordId,
596 interval: Duration,
597 ) -> ClientResult<()> {
598 let registry_domain = self.get_warg_registry(package.namespace()).await?;
599 let log_id = LogId::package_log::<Sha256>(package);
600 let mut current = self
601 .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
602 .await?;
603
604 loop {
605 match current.state {
606 PackageRecordState::Sourcing { .. } => {
607 return Err(ClientError::PackageMissingContent);
608 }
609 PackageRecordState::Published { .. } => {
610 self.fetch_package(package).await?;
611 return Ok(());
612 }
613 PackageRecordState::Rejected { reason } => {
614 return Err(ClientError::PublishRejected {
615 name: package.clone(),
616 record_id: record_id.clone(),
617 reason,
618 });
619 }
620 PackageRecordState::Processing => {
621 tokio::time::sleep(interval).await;
622 current = self
623 .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
624 .await?;
625 }
626 }
627 }
628 }
629
630 pub async fn update(&self) -> ClientResult<()> {
632 tracing::info!("updating downloaded package logs");
633
634 for mut packages in self.registry.load_all_packages().await?.into_values() {
635 self.update_checkpoints(&mut packages).await?;
636 }
637
638 Ok(())
639 }
640
641 pub async fn download(
655 &self,
656 package: &PackageName,
657 requirement: &VersionReq,
658 ) -> Result<Option<PackageDownload>, ClientError> {
659 let info = self.package(package).await?;
660
661 let registry_domain = self.get_warg_registry(package.namespace()).await?;
662
663 tracing::debug!(
664 package = package.as_ref(),
665 version_requirement = requirement.to_string(),
666 registry_header = ?registry_domain,
667 "downloading",
668 );
669
670 match info.state.find_latest_release(requirement) {
671 Some(release) => {
672 let digest = release
673 .content()
674 .context("invalid state: not yanked but missing content")?
675 .clone();
676 let path = self
677 .download_content(registry_domain.as_ref(), &digest)
678 .await?;
679 Ok(Some(PackageDownload {
680 version: release.version.clone(),
681 digest,
682 path,
683 }))
684 }
685 None => Ok(None),
686 }
687 }
688
689 pub async fn download_as_stream(
699 &self,
700 package: &PackageName,
701 requirement: &VersionReq,
702 ) -> Result<Option<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>)>, ClientError> {
703 let info = self.package(package).await?;
704
705 let registry_domain = self.get_warg_registry(package.namespace()).await?;
706
707 tracing::debug!(
708 package = package.as_ref(),
709 version_requirement = requirement.to_string(),
710 registry_header = ?registry_domain,
711 "downloading",
712 );
713
714 match info.state.find_latest_release(requirement) {
715 Some(release) => {
716 let digest = release
717 .content()
718 .context("invalid state: not yanked but missing content")?
719 .clone();
720 let stream = self
721 .download_content_stream(registry_domain.as_ref(), &digest)
722 .await?;
723 Ok(Some((
724 PackageDownloadInfo {
725 version: release.version.clone(),
726 digest,
727 },
728 stream,
729 )))
730 }
731 None => Ok(None),
732 }
733 }
734
735 pub async fn download_exact(
745 &self,
746 package: &PackageName,
747 version: &Version,
748 ) -> Result<PackageDownload, ClientError> {
749 let info = self.package(package).await?;
750
751 let registry_domain = self.get_warg_registry(package.namespace()).await?;
752
753 tracing::debug!(
754 package = package.as_ref(),
755 version = version.to_string(),
756 registry_header = ?registry_domain,
757 "downloading exact version",
758 );
759
760 let release =
761 info.state
762 .release(version)
763 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
764 version: version.clone(),
765 name: package.clone(),
766 })?;
767
768 let digest = release
769 .content()
770 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
771 version: version.clone(),
772 name: package.clone(),
773 })?;
774
775 Ok(PackageDownload {
776 version: version.clone(),
777 digest: digest.clone(),
778 path: self
779 .download_content(registry_domain.as_ref(), digest)
780 .await?,
781 })
782 }
783
784 pub async fn download_exact_as_stream(
791 &self,
792 package: &PackageName,
793 version: &Version,
794 ) -> Result<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>), ClientError> {
795 let info = self.package(package).await?;
796
797 let registry_domain = self.get_warg_registry(package.namespace()).await?;
798
799 tracing::debug!(
800 package = package.as_ref(),
801 version = version.to_string(),
802 registry_header = ?registry_domain,
803 "downloading exact version",
804 );
805
806 let release =
807 info.state
808 .release(version)
809 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
810 version: version.clone(),
811 name: package.clone(),
812 })?;
813
814 let digest = release
815 .content()
816 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
817 version: version.clone(),
818 name: package.clone(),
819 })?;
820
821 Ok((
822 PackageDownloadInfo {
823 version: version.clone(),
824 digest: digest.clone(),
825 },
826 self.download_content_stream(registry_domain.as_ref(), digest)
827 .await?,
828 ))
829 }
830
831 async fn update_packages_and_return_federated_packages<'a>(
832 &self,
833 registry_domain: Option<&RegistryDomain>,
834 packages: impl IntoIterator<Item = &'a mut PackageInfo>,
835 ) -> Result<IndexMap<Option<RegistryDomain>, Vec<&'a mut PackageInfo>>, ClientError> {
836 let ts_checkpoint = self.api.latest_checkpoint(registry_domain).await?;
837 let checkpoint = &ts_checkpoint.as_ref().checkpoint;
838
839 tracing::debug!(
840 log_length = checkpoint.log_length,
841 registry_header = ?registry_domain,
842 "updating to checkpoint",
843 );
844
845 let mut operator = self
847 .registry
848 .load_operator(registry_domain)
849 .await?
850 .unwrap_or_default();
851
852 let mut packages = packages
854 .into_iter()
855 .filter_map(|p| match &p.checkpoint {
856 Some(c) if p.registry.is_some() && c == checkpoint => None,
859 _ => Some((LogId::package_log::<Sha256>(&p.name), p)),
860 })
861 .inspect(|(_, p)| tracing::info!("package `{name}` will be updated", name = p.name))
862 .collect::<IndexMap<_, _>>();
863
864 if operator.checkpoint.is_some_and(|c| &c == checkpoint) && packages.is_empty() {
866 return Ok(IndexMap::default());
867 }
868
869 let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
871 IndexMap::with_capacity(packages.len());
872
873 let has_auth_token = self.api.auth_token().is_some();
875 loop {
876 let response = match self
877 .api
878 .fetch_logs(
879 registry_domain,
880 FetchLogsRequest {
881 log_length: checkpoint.log_length,
882 operator: operator
883 .head_fetch_token
884 .as_ref()
885 .map(|t| Cow::Borrowed(t.as_str())),
886 limit: None,
887 packages: Cow::Owned(
889 packages
890 .iter()
891 .map(|(id, p)| (id.clone(), p.head_fetch_token.clone()))
892 .collect::<IndexMap<_, _>>(),
893 ),
894 },
895 )
896 .await
897 .inspect(|res| {
898 for warning in res.warnings.iter() {
899 tracing::warn!("Fetch warning from registry: {}", warning.message);
900 }
901 }) {
902 Ok(res) => Ok(res),
903 Err(err) => match &err {
904 api::ClientError::Fetch(FetchError::LogNotFound(log_id))
905 | api::ClientError::Package(PackageError::LogNotFound(log_id)) => {
906 if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
907 Err(ClientError::PackageDoesNotExist {
908 name,
909 has_auth_token,
910 })
911 } else {
912 Err(ClientError::Api(err))
913 }
914 }
915
916 api::ClientError::LogNotFoundWithHint(log_id, hint)
917 if self.disable_interactive =>
918 {
919 let name = packages.get(log_id).unwrap().name.clone();
920
921 match hint.to_str().ok().map(|s| s.split_once('=')) {
922 Some(Some((namespace, registry))) if packages.contains_key(log_id) => {
923 Err(ClientError::PackageDoesNotExistWithHintHeader {
924 name,
925 has_auth_token,
926 hint_namespace: namespace.to_string(),
927 hint_registry: registry.to_string(),
928 })
929 }
930 _ => Err(ClientError::PackageDoesNotExist {
931 name,
932 has_auth_token,
933 }),
934 }
935 }
936
937 #[cfg(feature = "cli-interactive")]
938 api::ClientError::LogNotFoundWithHint(log_id, hint) => {
939 match hint.to_str().ok().map(|s| s.split_once('=')) {
940 Some(Some((namespace, registry)))
941 if !self.ignore_federation_hints
942 && packages.contains_key(log_id) =>
943 {
944 use dialoguer::{theme::ColorfulTheme, Confirm};
945
946 let package_name = &packages.get(log_id).unwrap().name;
947
948 if !self.disable_auto_accept_federation_hints
949 || Confirm::with_theme(&ColorfulTheme::default())
950 .with_prompt(format!(
951"Package `{package_name}` is not in `{current_registry}` registry.
952Registry recommends using `{registry}` registry for packages in `{namespace}` namespace.
953Accept recommendation y/N\n",
954current_registry = registry_domain.map(|d| d.as_str()).unwrap_or(&self.url().safe_label()),
955))
956 .default(true)
957 .interact()
958 .unwrap()
959 {
960 let federated_registry_domain =
961 Some(RegistryDomain::from_str(registry)?);
962 self.store_namespace(
963 namespace.to_string(),
964 federated_registry_domain.clone().unwrap(),
965 )
966 .await?;
967
968 packages = packages
970 .into_iter()
971 .filter_map(|(log_id, package_info)| {
972 if package_info.name.namespace() == namespace {
973 if let Some(package_set) = federated_packages
974 .get_mut(&federated_registry_domain)
975 {
976 package_set.push(package_info);
977 } else {
978 federated_packages.insert(
979 federated_registry_domain.clone(),
980 vec![package_info],
981 );
982 }
983
984 None
985 } else {
986 Some((log_id, package_info))
987 }
988 })
989 .collect();
990
991 continue;
993 } else {
994 Err(ClientError::PackageDoesNotExist {
995 name: package_name.clone(),
996 has_auth_token,
997 })
998 }
999 }
1000 _ => {
1001 if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
1002 Err(ClientError::PackageDoesNotExist {
1003 name,
1004 has_auth_token,
1005 })
1006 } else {
1007 Err(ClientError::Api(err))
1008 }
1009 }
1010 }
1011 }
1012 _ => Err(ClientError::Api(err)),
1013 },
1014 }?;
1015
1016 for record in response.operator {
1017 let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
1018 record.envelope.try_into()?;
1019
1020 if operator.head_registry_index.is_none()
1022 || proto_envelope.registry_index > operator.head_registry_index.unwrap()
1023 {
1024 operator.state = operator
1025 .state
1026 .validate(&proto_envelope.envelope)
1027 .map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
1028 operator.head_registry_index = Some(proto_envelope.registry_index);
1029 operator.head_fetch_token = Some(record.fetch_token);
1030 }
1031 }
1032
1033 for (log_id, records) in response.packages {
1034 let package = packages.get_mut(&log_id).ok_or_else(|| {
1035 anyhow!("received records for unknown package log `{log_id}`")
1036 })?;
1037
1038 for record in records {
1039 let proto_envelope: PublishedProtoEnvelope<package::PackageRecord> =
1040 record.envelope.try_into()?;
1041
1042 if package.head_registry_index.is_none()
1044 || proto_envelope.registry_index > package.head_registry_index.unwrap()
1045 {
1046 let state = std::mem::take(&mut package.state);
1047 package.state =
1048 state.validate(&proto_envelope.envelope).map_err(|inner| {
1049 ClientError::PackageValidationFailed {
1050 name: package.name.clone(),
1051 inner,
1052 }
1053 })?;
1054 package.head_registry_index = Some(proto_envelope.registry_index);
1055 package.head_fetch_token = Some(record.fetch_token);
1056 }
1057 }
1058
1059 if package.state.head().is_none() {
1061 return Err(ClientError::PackageLogEmpty {
1062 name: package.name.clone(),
1063 });
1064 }
1065 }
1066
1067 if !response.more {
1068 break;
1069 }
1070 }
1071
1072 TimestampedCheckpoint::verify(
1074 operator
1075 .state
1076 .public_key(ts_checkpoint.to_owned().to_owned().key_id())
1077 .ok_or(ClientError::InvalidCheckpointKeyId {
1078 key_id: ts_checkpoint.key_id().clone(),
1079 })?,
1080 &ts_checkpoint.as_ref().encode(),
1081 ts_checkpoint.signature(),
1082 )
1083 .or(Err(ClientError::InvalidCheckpointSignature))?;
1084
1085 let mut leaf_indices = Vec::with_capacity(packages.len() + 1 );
1087 let mut leafs = Vec::with_capacity(leaf_indices.len());
1088
1089 if let Some(index) = operator.head_registry_index {
1091 leaf_indices.push(index);
1092 leafs.push(LogLeaf {
1093 log_id: LogId::operator_log::<Sha256>(),
1094 record_id: operator.state.head().as_ref().unwrap().digest.clone(),
1095 });
1096 } else {
1097 return Err(ClientError::NoOperatorRecords);
1098 }
1099
1100 for (log_id, package) in &packages {
1102 if let Some(index) = package.head_registry_index {
1103 leaf_indices.push(index);
1104 leafs.push(LogLeaf {
1105 log_id: log_id.clone(),
1106 record_id: package.state.head().as_ref().unwrap().digest.clone(),
1107 });
1108 } else {
1109 return Err(ClientError::PackageLogEmpty {
1110 name: package.name.clone(),
1111 });
1112 }
1113 }
1114
1115 if !leafs.is_empty() {
1116 self.api
1117 .prove_inclusion(
1118 registry_domain,
1119 InclusionRequest {
1120 log_length: checkpoint.log_length,
1121 leafs: leaf_indices,
1122 },
1123 checkpoint,
1124 &leafs,
1125 )
1126 .await?;
1127 }
1128
1129 if let Some(from) = self.registry.load_checkpoint(registry_domain).await? {
1130 let from_log_length = from.as_ref().checkpoint.log_length;
1131 let to_log_length = ts_checkpoint.as_ref().checkpoint.log_length;
1132
1133 match from_log_length.cmp(&to_log_length) {
1134 Ordering::Greater => {
1135 return Err(ClientError::CheckpointLogLengthRewind {
1136 from: from_log_length,
1137 to: to_log_length,
1138 });
1139 }
1140 Ordering::Less => {
1141 self.api
1142 .prove_log_consistency(
1143 registry_domain,
1144 ConsistencyRequest {
1145 from: from_log_length,
1146 to: to_log_length,
1147 },
1148 Cow::Borrowed(&from.as_ref().checkpoint.log_root),
1149 Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
1150 )
1151 .await?
1152 }
1153 Ordering::Equal => {
1154 if from.as_ref().checkpoint.log_root
1155 != ts_checkpoint.as_ref().checkpoint.log_root
1156 || from.as_ref().checkpoint.map_root
1157 != ts_checkpoint.as_ref().checkpoint.map_root
1158 {
1159 return Err(ClientError::CheckpointChangedLogRootOrMapRoot {
1160 log_length: from_log_length,
1161 });
1162 }
1163 }
1164 }
1165 }
1166
1167 operator.registry = registry_domain
1168 .cloned()
1169 .or_else(|| Some(self.url().registry_domain()));
1170 operator.checkpoint = Some(checkpoint.clone()); self.registry
1172 .store_operator(registry_domain, operator)
1173 .await?;
1174
1175 for package in packages.values_mut() {
1176 package.registry = registry_domain
1177 .cloned()
1178 .or_else(|| Some(self.url().registry_domain()));
1179 package.checkpoint = Some(checkpoint.clone()); self.registry
1181 .store_package(registry_domain, package)
1182 .await?;
1183 }
1184
1185 self.registry
1186 .store_checkpoint(registry_domain, &ts_checkpoint)
1187 .await?;
1188
1189 Ok(federated_packages)
1191 }
1192
1193 async fn update_checkpoints<'a>(
1195 &self,
1196 packages: impl IntoIterator<Item = &mut PackageInfo>,
1197 ) -> Result<(), ClientError> {
1198 let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
1200 IndexMap::new();
1201 for package in packages.into_iter() {
1202 let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
1203 if let Some(package_set) = federated_packages.get_mut(®istry_domain) {
1204 package_set.push(package);
1205 } else {
1206 federated_packages.insert(registry_domain, vec![package]);
1207 }
1208 }
1209
1210 while let Some((registry_domain, packages)) = federated_packages.pop() {
1211 for (registry_domain, packages) in self
1212 .update_packages_and_return_federated_packages(registry_domain.as_ref(), packages)
1213 .await?
1214 .into_iter()
1215 {
1216 if let Some(package_set) = federated_packages.get_mut(®istry_domain) {
1217 package_set.extend(packages);
1218 } else {
1219 federated_packages.insert(registry_domain, packages);
1220 }
1221 }
1222 }
1223
1224 Ok(())
1225 }
1226
1227 pub async fn fetch_packages(
1229 &self,
1230 names: impl IntoIterator<Item = &PackageName>,
1231 ) -> Result<Vec<PackageInfo>, ClientError> {
1232 let mut packages: Vec<PackageInfo> = names
1233 .into_iter()
1234 .map(|name| PackageInfo::new(name.clone()))
1235 .collect();
1236 self.update_checkpoints(packages.iter_mut()).await?;
1237 Ok(packages)
1238 }
1239
1240 pub async fn fetch_package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1242 let mut info = PackageInfo::new(name.clone());
1243 self.update_checkpoints([&mut info]).await?;
1244 Ok(info)
1245 }
1246
1247 pub async fn package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1250 let registry_domain = self.get_warg_registry(name.namespace()).await?;
1251 match self
1252 .registry
1253 .load_package(registry_domain.as_ref(), name)
1254 .await?
1255 {
1256 Some(mut info) => {
1257 tracing::info!("log for package `{name}` already exists in storage");
1258 if info.registry.is_none() {
1259 info.registry = registry_domain
1260 .clone()
1261 .or_else(|| Some(self.url().registry_domain()));
1262 }
1263 Ok(info)
1264 }
1265 None => {
1266 let mut info = PackageInfo::new(name.clone());
1267 self.update_checkpoints([&mut info]).await?;
1268 Ok(info)
1269 }
1270 }
1271 }
1272
1273 async fn get_package_record(
1274 &self,
1275 registry_domain: Option<&RegistryDomain>,
1276 package: &PackageName,
1277 log_id: &LogId,
1278 record_id: &RecordId,
1279 ) -> ClientResult<PackageRecord> {
1280 let record = self
1281 .api
1282 .get_package_record(registry_domain, log_id, record_id)
1283 .await
1284 .map_err(|e| match e {
1285 api::ClientError::Package(PackageError::Rejection(reason)) => {
1286 ClientError::PublishRejected {
1287 name: package.clone(),
1288 reason,
1289 record_id: record_id.clone(),
1290 }
1291 }
1292 e => {
1293 ClientError::translate_log_not_found(e, self.api.auth_token().is_some(), |id| {
1294 if id == log_id {
1295 Some(package.clone())
1296 } else {
1297 None
1298 }
1299 })
1300 }
1301 })?;
1302 Ok(record)
1303 }
1304
1305 async fn download_content(
1310 &self,
1311 registry_domain: Option<&RegistryDomain>,
1312 digest: &AnyHash,
1313 ) -> Result<PathBuf, ClientError> {
1314 match self.content.content_location(digest) {
1315 Some(path) => {
1316 tracing::info!("content for digest `{digest}` already exists in storage");
1317 Ok(path)
1318 }
1319 None => {
1320 self.content
1321 .store_content(
1322 Box::pin(self.api.download_content(registry_domain, digest).await?),
1323 Some(digest),
1324 )
1325 .await?;
1326
1327 self.content
1328 .content_location(digest)
1329 .ok_or_else(|| ClientError::ContentNotFound {
1330 digest: digest.clone(),
1331 })
1332 }
1333 }
1334 }
1335
1336 async fn download_content_stream(
1342 &self,
1343 registry_domain: Option<&RegistryDomain>,
1344 digest: &AnyHash,
1345 ) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
1346 match self.content.content_location(digest) {
1347 Some(path) => {
1348 tracing::info!("content for digest `{digest}` already exists in storage");
1349 let file = tokio::fs::File::open(path)
1350 .await
1351 .map_err(ClientError::IoError)?;
1352 Ok(ReaderStream::new(file).map_err(Into::into).boxed())
1353 }
1354 None => Ok(Box::pin(
1355 self.api.download_content(registry_domain, digest).await?,
1356 )),
1357 }
1358 }
1359}
1360pub type FileSystemClient =
1363 Client<FileSystemRegistryStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage>;
1364
1365pub enum StorageLockResult<T> {
1367 Acquired(T),
1369 NotAcquired(PathBuf),
1371}
1372
1373impl FileSystemClient {
1374 async fn storage_paths(
1375 url: Option<&str>,
1376 config: &Config,
1377 disable_interactive: bool,
1378 ) -> Result<StoragePaths, ClientError> {
1379 let checking_url_for_well_known = RegistryUrl::new(
1380 url.or(config.home_url.as_deref())
1381 .unwrap_or(DEFAULT_REGISTRY),
1382 )?;
1383
1384 let url = if let Some(warg_url) =
1385 api::Client::new(checking_url_for_well_known.to_string(), None)?
1386 .well_known_config()
1387 .await?
1388 {
1389 if !disable_interactive && warg_url != checking_url_for_well_known {
1390 println!(
1391 "Resolved `{well_known}` to registry hosted on `{registry}`",
1392 well_known = checking_url_for_well_known.registry_domain(),
1393 registry = warg_url.registry_domain(),
1394 );
1395 }
1396 warg_url
1397 } else {
1398 RegistryUrl::new(
1399 url.or(config.home_url.as_deref())
1400 .ok_or(ClientError::NoHomeRegistryUrl)?,
1401 )?
1402 };
1403
1404 config.storage_paths_for_url(url)
1405 }
1406
1407 pub async fn try_new_with_config(
1416 registry: Option<&str>,
1417 config: &Config,
1418 mut auth_token: Option<Secret<String>>,
1419 ) -> Result<StorageLockResult<Self>, ClientError> {
1420 let disable_interactive =
1421 cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1422
1423 let StoragePaths {
1424 registry_url: url,
1425 registries_dir,
1426 content_dir,
1427 namespace_map_path,
1428 } = Self::storage_paths(registry, config, disable_interactive).await?;
1429
1430 let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1431 (config.keyring_backend.clone(), config.keys.clone())
1432 } else {
1433 (None, IndexSet::new())
1434 };
1435
1436 #[cfg(feature = "keyring")]
1437 if auth_token.is_none() && config.keyring_auth {
1438 auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1439 }
1440
1441 let (packages, content, namespace_map) = match (
1442 FileSystemRegistryStorage::try_lock(registries_dir.clone())?,
1443 FileSystemContentStorage::try_lock(content_dir.clone())?,
1444 FileSystemNamespaceMapStorage::new(namespace_map_path.clone()),
1445 ) {
1446 (Some(packages), Some(content), namespace_map) => (packages, content, namespace_map),
1447 (None, _, _) => return Ok(StorageLockResult::NotAcquired(registries_dir)),
1448 (_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)),
1449 };
1450
1451 Ok(StorageLockResult::Acquired(Self::new(
1452 url.into_url(),
1453 packages,
1454 content,
1455 namespace_map,
1456 auth_token,
1457 config.ignore_federation_hints,
1458 config.disable_auto_accept_federation_hints,
1459 config.disable_auto_package_init,
1460 disable_interactive,
1461 keyring_backend,
1462 keys,
1463 )?))
1464 }
1465
1466 pub async fn try_new_with_default_config(
1478 url: Option<&str>,
1479 ) -> Result<StorageLockResult<Self>, ClientError> {
1480 Self::try_new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None)
1481 .await
1482 }
1483
1484 pub async fn new_with_config(
1491 registry: Option<&str>,
1492 config: &Config,
1493 mut auth_token: Option<Secret<String>>,
1494 ) -> Result<Self, ClientError> {
1495 let disable_interactive =
1496 cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1497
1498 let StoragePaths {
1499 registry_url: url,
1500 registries_dir,
1501 content_dir,
1502 namespace_map_path,
1503 } = Self::storage_paths(registry, config, disable_interactive).await?;
1504
1505 let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1506 (config.keyring_backend.clone(), config.keys.clone())
1507 } else {
1508 (None, IndexSet::new())
1509 };
1510
1511 #[cfg(feature = "keyring")]
1512 if auth_token.is_none() && config.keyring_auth {
1513 auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1514 }
1515
1516 Self::new(
1517 url.into_url(),
1518 FileSystemRegistryStorage::lock(registries_dir)?,
1519 FileSystemContentStorage::lock(content_dir)?,
1520 FileSystemNamespaceMapStorage::new(namespace_map_path),
1521 auth_token,
1522 config.ignore_federation_hints,
1523 config.disable_auto_accept_federation_hints,
1524 config.disable_auto_package_init,
1525 disable_interactive,
1526 keyring_backend,
1527 keys,
1528 )
1529 }
1530
1531 pub async fn new_with_default_config(url: Option<&str>) -> Result<Self, ClientError> {
1541 Self::new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None).await
1542 }
1543}
1544
1545#[derive(Debug, Clone)]
1547pub struct PackageDownload {
1548 pub version: Version,
1550 pub digest: AnyHash,
1552 pub path: PathBuf,
1554}
1555
1556pub struct PackageDownloadInfo {
1558 pub version: Version,
1560 pub digest: AnyHash,
1562}
1563
1564#[derive(Debug, Error)]
1566pub enum ClientError {
1567 #[error("no home registry registry server URL is configured")]
1569 NoHomeRegistryUrl,
1570
1571 #[error("reset registry state failed")]
1573 ResettingRegistryLocalStateFailed,
1574
1575 #[error("clear content cache failed")]
1577 ClearContentCacheFailed,
1578
1579 #[error("unauthorized: {0}")]
1581 Unauthorized(String),
1582
1583 #[error("invalid checkpoint signature")]
1585 InvalidCheckpointSignature,
1586
1587 #[error("invalid checkpoint key ID `{key_id}`")]
1589 InvalidCheckpointKeyId {
1590 key_id: signing::KeyID,
1592 },
1593
1594 #[error("the server did not provide any operator records")]
1596 NoOperatorRecords,
1597
1598 #[error("operator failed validation: {inner}")]
1600 OperatorValidationFailed {
1601 inner: operator::ValidationError,
1603 },
1604
1605 #[error("package `{name}` already exists and cannot be initialized")]
1607 CannotInitializePackage {
1608 name: PackageName,
1610 init_record_id: Option<RecordId>,
1612 },
1613
1614 #[error("package `{name}` must be initialized before publishing")]
1616 MustInitializePackage {
1617 name: PackageName,
1619 has_auth_token: bool,
1621 },
1622
1623 #[error("there is no publish operation in progress")]
1625 NotPublishing,
1626
1627 #[error("package `{name}` has no records to publish")]
1629 NothingToPublish {
1630 name: PackageName,
1632 },
1633
1634 #[error("package `{name}` does not exist")]
1636 PackageDoesNotExist {
1637 name: PackageName,
1639 has_auth_token: bool,
1641 },
1642
1643 #[error("package `{name}` does not exist but the registry suggests checking registry `{hint_registry}` for packages in namespace `{hint_namespace}`")]
1645 PackageDoesNotExistWithHintHeader {
1646 name: PackageName,
1648 has_auth_token: bool,
1650 hint_namespace: String,
1652 hint_registry: String,
1654 },
1655
1656 #[error("version `{version}` of package `{name}` does not exist")]
1658 PackageVersionDoesNotExist {
1659 version: Version,
1661 name: PackageName,
1663 },
1664
1665 #[error("version that satisfies requirement `{version}` was not found for package `{name}`")]
1667 PackageVersionRequirementDoesNotExist {
1668 version: VersionReq,
1670 name: PackageName,
1672 },
1673
1674 #[error("package `{name}` failed validation: {inner}")]
1676 PackageValidationFailed {
1677 name: PackageName,
1679 inner: package::ValidationError,
1681 },
1682
1683 #[error("content with digest `{digest}` was not found in client storage")]
1685 ContentNotFound {
1686 digest: AnyHash,
1688 },
1689
1690 #[error("content with digest `{digest}` was not found expected `{expected}`")]
1692 IncorrectContent {
1693 digest: AnyHash,
1695 expected: AnyHash,
1697 },
1698
1699 #[error("package log is empty and cannot be validated")]
1701 PackageLogEmpty {
1702 name: PackageName,
1704 },
1705
1706 #[error("the publishing of package `{name}` was rejected due to: {reason}")]
1708 PublishRejected {
1709 name: PackageName,
1711 record_id: RecordId,
1713 reason: String,
1715 },
1716
1717 #[error("the publishing of package `{name}` was rejected due to conflicting pending publish of record `{pending_record_id}`")]
1719 ConflictPendingPublish {
1720 name: PackageName,
1722 record_id: RecordId,
1724 pending_record_id: RecordId,
1726 },
1727
1728 #[error("the package is still missing content after all content was uploaded")]
1730 PackageMissingContent,
1731
1732 #[error("registry rewinded checkpoints; latest checkpoint log length `{to}` is less than previously received checkpoint log length `{from}`")]
1735 CheckpointLogLengthRewind {
1736 from: RegistryLen,
1738 to: RegistryLen,
1740 },
1741
1742 #[error("registry provided a new checkpoint with the same log length `{log_length}` as previously fetched but different log root or map root")]
1745 CheckpointChangedLogRootOrMapRoot {
1746 log_length: RegistryLen,
1748 },
1749
1750 #[error(transparent)]
1752 Keyring(#[from] crate::keyring::KeyringError),
1753
1754 #[error(transparent)]
1756 Api(#[from] api::ClientError),
1757
1758 #[error("{0:?}")]
1760 Other(#[from] anyhow::Error),
1761
1762 #[error("error: {0:?}")]
1764 IoError(#[from] std::io::Error),
1765}
1766
1767impl ClientError {
1768 fn translate_log_not_found(
1769 e: api::ClientError,
1770 has_auth_token: bool,
1771 lookup: impl Fn(&LogId) -> Option<PackageName>,
1772 ) -> Self {
1773 match &e {
1774 api::ClientError::Fetch(FetchError::LogNotFound(id))
1775 | api::ClientError::Package(PackageError::LogNotFound(id)) => {
1776 if let Some(name) = lookup(id) {
1777 return Self::PackageDoesNotExist {
1778 name,
1779 has_auth_token,
1780 };
1781 }
1782 }
1783 _ => {}
1784 }
1785
1786 Self::Api(e)
1787 }
1788}
1789
1790pub type ClientResult<T> = Result<T, ClientError>;