1#![deny(missing_docs)]
4use crate::storage::PackageInfo;
5
6use anyhow::{anyhow, Context, Result};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt, TryStreamExt};
9use indexmap::{IndexMap, IndexSet};
10use reqwest::{Body, IntoUrl};
11use secrecy::Secret;
12use semver::{Version, VersionReq};
13use std::cmp::Ordering;
14use std::fs;
15use std::str::FromStr;
16use std::{borrow::Cow, path::PathBuf, time::Duration};
17use storage::{
18 ContentStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage,
19 FileSystemRegistryStorage, NamespaceMapStorage, PublishInfo, RegistryDomain, RegistryStorage,
20};
21use thiserror::Error;
22use tokio_util::io::ReaderStream;
23use warg_api::v1::{
24 fetch::{FetchError, FetchLogsRequest},
25 package::{
26 MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27 UploadEndpoint,
28 },
29 proof::{ConsistencyRequest, InclusionRequest},
30};
31use warg_crypto::hash::Sha256;
32use warg_crypto::{hash::AnyHash, signing, Encode, Signable};
33use warg_protocol::package::ReleaseState;
34use warg_protocol::{
35 operator, package,
36 registry::{LogId, LogLeaf, PackageName, RecordId, RegistryLen, TimestampedCheckpoint},
37 PublishedProtoEnvelope,
38};
39use wasm_compose::graph::{CompositionGraph, EncodeOptions, ExportIndex, InstanceId};
40use wasmparser::Validator;
41
42#[cfg(feature = "keyring")]
43pub mod keyring;
44
45pub mod api;
46mod config;
47pub mod depsolve;
49use depsolve::{Bundler, LockListBuilder};
50pub mod version_util;
52use version_util::{kindless_name, locked_package, versioned_package, Import, ImportKind};
53pub mod lock;
54mod registry_url;
55pub mod storage;
56pub use self::config::*;
57pub use self::registry_url::RegistryUrl;
58
59const DEFAULT_WAIT_INTERVAL: Duration = Duration::from_secs(1);
60
61pub const DEFAULT_REGISTRY: &str = "bytecodealliance.org";
64
65pub struct Client<R, C, N>
67where
68 R: RegistryStorage,
69 C: ContentStorage,
70 N: NamespaceMapStorage,
71{
72 registry: R,
73 content: C,
74 namespace_map: N,
75 api: api::Client,
76 ignore_federation_hints: bool,
77 disable_auto_accept_federation_hints: bool,
78 disable_auto_package_init: bool,
79 disable_interactive: bool,
80 keyring_backend: Option<String>,
81 keys: IndexSet<String>,
82}
83
84impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C, N> {
85 #[allow(clippy::too_many_arguments)]
88 pub fn new(
89 url: impl IntoUrl,
90 registry: R,
91 content: C,
92 namespace_map: N,
93 auth_token: Option<Secret<String>>,
94 ignore_federation_hints: bool,
95 disable_auto_accept_federation_hints: bool,
96 disable_auto_package_init: bool,
97 disable_interactive: bool,
98 keyring_backend: Option<String>,
99 keys: IndexSet<String>,
100 ) -> ClientResult<Self> {
101 let api = api::Client::new(url, auth_token)?;
102 Ok(Self {
103 registry,
104 content,
105 namespace_map,
106 api,
107 ignore_federation_hints,
108 disable_auto_accept_federation_hints,
109 disable_auto_package_init,
110 disable_interactive,
111 keyring_backend,
112 keys,
113 })
114 }
115
116 pub fn url(&self) -> &RegistryUrl {
118 self.api.url()
119 }
120
121 pub fn registry(&self) -> &R {
123 &self.registry
124 }
125
126 pub fn content(&self) -> &C {
128 &self.content
129 }
130
131 pub fn namespace_map(&self) -> &N {
133 &self.namespace_map
134 }
135
136 pub async fn get_warg_registry(
138 &self,
139 namespace: &str,
140 ) -> Result<Option<RegistryDomain>, ClientError> {
141 let operator = self
142 .registry()
143 .load_operator(Some(&RegistryDomain::from_str(namespace)?))
144 .await?;
145 if let Some(op) = operator {
146 match op.state.namespace_state(namespace) {
147 Some(warg_protocol::operator::NamespaceState::Imported { registry }) => {
148 return Ok(Some(RegistryDomain::from_str(registry)?));
149 }
150 Some(warg_protocol::operator::NamespaceState::Defined) => {
151 return Ok(None);
152 }
153 _ => (),
154 }
155 };
156 let nm_map = self.namespace_map.load_namespace_map().await?;
157 Ok(nm_map.and_then(|nm_map| {
158 nm_map
159 .get(namespace)
160 .map(|domain| RegistryDomain::from_str(domain).unwrap())
161 }))
162 }
163
164 pub async fn store_namespace(
166 &self,
167 namespace: String,
168 registry_domain: RegistryDomain,
169 ) -> Result<()> {
170 self.namespace_map
171 .store_namespace(namespace, registry_domain)
172 .await?;
173 Ok(())
174 }
175
176 pub async fn reset_namespaces(&self) -> Result<()> {
178 self.namespace_map.reset_namespaces().await?;
179 Ok(())
180 }
181
182 pub async fn reset_registry(&self) -> ClientResult<()> {
184 tracing::info!("resetting registry local state");
185 self.registry
186 .reset(true)
187 .await
188 .or(Err(ClientError::ResettingRegistryLocalStateFailed))
189 }
190
191 pub async fn clear_content_cache(&self) -> ClientResult<()> {
193 tracing::info!("removing content cache");
194 self.content
195 .clear()
196 .await
197 .or(Err(ClientError::ClearContentCacheFailed))
198 }
199
200 pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
202 let mut builder = LockListBuilder::default();
203 builder.build_list(self, info).await?;
204 let top = Import {
205 name: format!("{}:{}", info.name.namespace(), info.name.name()),
206 req: VersionReq::STAR,
207 kind: ImportKind::Unlocked,
208 };
209 builder.lock_list.insert(top);
210 let mut composer = CompositionGraph::new();
211 let mut handled = IndexMap::<String, InstanceId>::new();
212 for package in builder.lock_list {
213 let name = package.name.clone();
214 let version = package.req;
215 let id = PackageName::new(name)?;
216 let info = self
217 .registry()
218 .load_package(self.get_warg_registry(id.namespace()).await?.as_ref(), &id)
219 .await?;
220 if let Some(inf) = info {
221 let release = if version != VersionReq::STAR {
222 inf.state
223 .releases()
224 .filter(|r| version.matches(&r.version))
225 .last()
226 } else {
227 inf.state.releases().last()
228 };
229
230 if let Some(r) = release {
231 let state = &r.state;
232 if let ReleaseState::Released { content } = state {
233 let locked_package = locked_package(&package.name, r, content);
234 let path = self.content().content_location(content);
235 if let Some(p) = path {
236 let bytes = fs::read(&p).map_err(|_| ClientError::ContentNotFound {
237 digest: content.clone(),
238 })?;
239
240 let read_digest =
241 AnyHash::from_str(&format!("sha256:{}", sha256::digest(bytes)))
242 .unwrap();
243 if content != &read_digest {
244 return Err(ClientError::IncorrectContent {
245 digest: read_digest,
246 expected: content.clone(),
247 });
248 }
249 let mut validator = Validator::new();
250 let component = wasm_compose::graph::Component::from_file(
251 &mut validator,
252 &locked_package,
253 p,
254 )?;
255 let component_id = if let Some((id, _)) =
256 composer.get_component_by_name(&locked_package)
257 {
258 id
259 } else {
260 composer.add_component(component)?
261 };
262 let instance_id = composer.instantiate(component_id)?;
263 let added = composer.get_component(component_id);
264 handled.insert(versioned_package(&package.name, version), instance_id);
265 let mut args = Vec::new();
266 if let Some(added) = added {
267 for (index, name, _) in added.imports() {
268 let iid = handled.get(kindless_name(name));
269 if let Some(arg) = iid {
270 args.push((arg, index));
271 }
272 }
273 }
274 for arg in args {
275 composer.connect(
276 *arg.0,
277 None::<ExportIndex>,
278 instance_id,
279 arg.1,
280 )?;
281 }
282 }
283 }
284 }
285 }
286 }
287 let final_name = &format!("{}:{}", info.name.namespace(), &info.name.name());
288 let id = handled.get(final_name);
289 let options = EncodeOptions {
290 export: id.copied(),
291 ..Default::default()
292 };
293 let locked = composer.encode(options)?;
294 fs::write("./locked.wasm", locked.as_slice()).map_err(|e| ClientError::Other(e.into()))?;
295 Ok(locked)
296 }
297
298 pub async fn bundle_component(&self, info: &PackageInfo) -> ClientResult<Vec<u8>> {
300 let mut bundler = Bundler::new(self);
301 let path = PathBuf::from("./locked.wasm");
302 let locked = if !path.is_file() {
303 self.lock_component(info).await?
304 } else {
305 fs::read("./locked.wasm").map_err(|e| ClientError::Other(e.into()))?
306 };
307 let bundled = bundler.parse(&locked).await?;
308 fs::write("./bundled.wasm", bundled.as_slice())
309 .map_err(|e| ClientError::Other(e.into()))?;
310 Ok(bundled.as_slice().to_vec())
311 }
312
313 pub async fn publish(&self, signing_key: &signing::PrivateKey) -> ClientResult<RecordId> {
321 let info = self
322 .registry
323 .load_publish()
324 .await?
325 .ok_or(ClientError::NotPublishing)?;
326
327 let res = self.publish_with_info(signing_key, info).await;
328 self.registry.store_publish(None).await?;
329 res
330 }
331
332 #[cfg(feature = "keyring")]
341 pub async fn sign_with_keyring_and_publish(
342 &self,
343 publish_info: Option<PublishInfo>,
344 ) -> ClientResult<RecordId> {
345 let publish_info = if let Some(publish_info) = publish_info {
346 publish_info
347 } else {
348 self.registry
349 .load_publish()
350 .await?
351 .ok_or(ClientError::NotPublishing)?
352 };
353
354 let registry_domain = self
355 .get_warg_registry(publish_info.name.namespace())
356 .await?;
357 let signing_key = keyring::Keyring::new(
358 self.keyring_backend
359 .as_deref()
360 .unwrap_or(keyring::Keyring::DEFAULT_BACKEND),
361 )?
362 .get_signing_key(
363 registry_domain.map(|domain| domain.to_string()).as_deref(),
364 &self.keys,
365 Some(&self.url().to_string()),
366 )?;
367
368 let res = self.publish_with_info(&signing_key, publish_info).await;
369 self.registry.store_publish(None).await?;
370 res
371 }
372
373 pub async fn publish_with_info(
381 &self,
382 signing_key: &signing::PrivateKey,
383 publish_info: PublishInfo,
384 ) -> ClientResult<RecordId> {
385 if publish_info.entries.is_empty() {
386 return Err(ClientError::NothingToPublish {
387 name: publish_info.name.clone(),
388 });
389 }
390
391 tracing::info!(
392 "publishing {new}package `{name}`",
393 name = publish_info.name,
394 new = if publish_info.initializing() {
395 "new "
396 } else {
397 ""
398 }
399 );
400 tracing::debug!("entries: {:?}", publish_info.entries);
401
402 let mut accepted_prompt_to_initialize = false;
403
404 let mut init_record_id: Option<RecordId> = None;
405
406 let (package, record) = loop {
407 let mut info = publish_info.clone();
408
409 let mut initializing = info.initializing();
410
411 let package = match self.fetch_package(&info.name).await {
412 Ok(package) => {
413 if initializing {
414 return Err(ClientError::CannotInitializePackage {
415 name: package.name,
416 init_record_id,
417 });
418 } else if info.head.is_none() {
419 info.head = package.state.head().as_ref().map(|h| h.digest.clone());
422 }
423 package
424 }
425 Err(ClientError::PackageDoesNotExist {
426 name,
427 has_auth_token,
428 }) => {
429 if !initializing {
430 if !self.disable_auto_package_init {
431 info.entries.insert(0, crate::storage::PublishEntry::Init);
432 initializing = true;
433 accepted_prompt_to_initialize = true;
434 } else {
435 if self.disable_interactive || cfg!(not(feature = "cli-interactive")) {
436 return Err(ClientError::MustInitializePackage {
437 name,
438 has_auth_token,
439 });
440 }
441
442 #[cfg(feature = "cli-interactive")]
443 {
444 use crate::storage::PublishEntry;
445 use dialoguer::{theme::ColorfulTheme, Confirm};
446
447 if accepted_prompt_to_initialize
448 || Confirm::with_theme(&ColorfulTheme::default())
449 .with_prompt(format!(
450 "Package `{package_name}` was not found.
451 If it exists, you may not have access.
452 Attempt to create `{package_name}` and publish the release y/N\n",
453 package_name = &info.name,
454 ))
455 .default(false)
456 .interact()
457 .unwrap()
458 {
459 info.entries.insert(0, PublishEntry::Init);
460 initializing = true;
461 accepted_prompt_to_initialize = true;
462 } else {
463 return Err(ClientError::MustInitializePackage {
464 name,
465 has_auth_token,
466 });
467 }
468 }
469 }
470 }
471 PackageInfo::new(info.name.clone())
472 }
473 err => err?,
474 };
475 let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
476
477 let log_id = LogId::package_log::<Sha256>(&package.name);
478 let record = info.finalize(signing_key)?;
479 let record_id = RecordId::package_record::<Sha256>(&record);
480 let record = match self
481 .api
482 .publish_package_record(
483 registry_domain.as_ref(),
484 &log_id,
485 PublishRecordRequest {
486 package_name: Cow::Borrowed(&package.name),
487 record: Cow::Owned(record.into()),
488 content_sources: Default::default(),
489 },
490 )
491 .await
492 {
493 Ok(record) => Ok(record),
494 Err(api::ClientError::Package(PackageError::Rejection(reason))) => {
495 Err(ClientError::PublishRejected {
496 name: package.name.clone(),
497 reason,
498 record_id,
499 })
500 }
501 Err(api::ClientError::Package(PackageError::Unauthorized(reason))) => {
502 Err(ClientError::Unauthorized(reason))
503 }
504 Err(api::ClientError::Package(PackageError::ConflictPendingPublish(
505 pending_record_id,
506 ))) => {
507 tracing::info!("waiting for conflicting publish to complete");
509 if initializing {
511 match self.fetch_package(&package.name).await {
512 Ok(_) => {}
513 Err(ClientError::PackageDoesNotExist { .. }) => {}
515 Err(err) => return Err(err),
516 }
517 init_record_id = Some(pending_record_id.clone());
518 }
519 self.wait_for_publish(&package.name, &pending_record_id, DEFAULT_WAIT_INTERVAL)
520 .await
521 .map_err(|err| match err {
522 ClientError::PackageMissingContent => {
523 ClientError::ConflictPendingPublish {
524 name: package.name.clone(),
525 record_id,
526 pending_record_id,
527 }
528 }
529 err => err,
530 })?;
531
532 continue;
533 }
534 Err(e) => Err(ClientError::translate_log_not_found(
535 e,
536 self.api.auth_token().is_some(),
537 |id| {
538 if id == &log_id {
539 Some(package.name.clone())
540 } else {
541 None
542 }
543 },
544 )),
545 }?;
546
547 break (package, record);
548 };
549
550 for (digest, MissingContent { upload }) in record.missing_content() {
552 let Some(UploadEndpoint::Http {
554 method,
555 url,
556 headers,
557 }) = upload.first()
558 else {
559 continue;
560 };
561
562 self.api
563 .upload_content(
564 method,
565 url,
566 headers,
567 Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
568 || ClientError::ContentNotFound {
569 digest: digest.clone(),
570 },
571 )?),
572 )
573 .await
574 .map_err(|e| match e {
575 api::ClientError::Package(PackageError::Rejection(reason)) => {
576 ClientError::PublishRejected {
577 name: package.name.clone(),
578 record_id: record.record_id.clone(),
579 reason,
580 }
581 }
582 api::ClientError::Package(PackageError::Unauthorized(reason)) => {
583 ClientError::Unauthorized(reason)
584 }
585 _ => e.into(),
586 })?;
587 }
588
589 Ok(record.record_id)
590 }
591
592 pub async fn wait_for_publish(
598 &self,
599 package: &PackageName,
600 record_id: &RecordId,
601 interval: Duration,
602 ) -> ClientResult<()> {
603 let registry_domain = self.get_warg_registry(package.namespace()).await?;
604 let log_id = LogId::package_log::<Sha256>(package);
605 let mut current = self
606 .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
607 .await?;
608
609 loop {
610 match current.state {
611 PackageRecordState::Sourcing { .. } => {
612 return Err(ClientError::PackageMissingContent);
613 }
614 PackageRecordState::Published { .. } => {
615 self.fetch_package(package).await?;
616 return Ok(());
617 }
618 PackageRecordState::Rejected { reason } => {
619 return Err(ClientError::PublishRejected {
620 name: package.clone(),
621 record_id: record_id.clone(),
622 reason,
623 });
624 }
625 PackageRecordState::Processing => {
626 tokio::time::sleep(interval).await;
627 current = self
628 .get_package_record(registry_domain.as_ref(), package, &log_id, record_id)
629 .await?;
630 }
631 }
632 }
633 }
634
635 pub async fn update(&self) -> ClientResult<()> {
637 tracing::info!("updating downloaded package logs");
638
639 for mut packages in self.registry.load_all_packages().await?.into_values() {
640 self.update_checkpoints(&mut packages).await?;
641 }
642
643 Ok(())
644 }
645
646 pub async fn download(
660 &self,
661 package: &PackageName,
662 requirement: &VersionReq,
663 ) -> Result<Option<PackageDownload>, ClientError> {
664 let info = self.package(package).await?;
665
666 let registry_domain = self.get_warg_registry(package.namespace()).await?;
667
668 tracing::debug!(
669 package = package.as_ref(),
670 version_requirement = requirement.to_string(),
671 registry_header = ?registry_domain,
672 "downloading",
673 );
674
675 match info.state.find_latest_release(requirement) {
676 Some(release) => {
677 let digest = release
678 .content()
679 .context("invalid state: not yanked but missing content")?
680 .clone();
681 let path = self
682 .download_content(registry_domain.as_ref(), &digest)
683 .await?;
684 Ok(Some(PackageDownload {
685 version: release.version.clone(),
686 digest,
687 path,
688 }))
689 }
690 None => Ok(None),
691 }
692 }
693
694 pub async fn download_as_stream(
704 &self,
705 package: &PackageName,
706 requirement: &VersionReq,
707 ) -> Result<Option<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>)>, ClientError> {
708 let info = self.package(package).await?;
709
710 let registry_domain = self.get_warg_registry(package.namespace()).await?;
711
712 tracing::debug!(
713 package = package.as_ref(),
714 version_requirement = requirement.to_string(),
715 registry_header = ?registry_domain,
716 "downloading",
717 );
718
719 match info.state.find_latest_release(requirement) {
720 Some(release) => {
721 let digest = release
722 .content()
723 .context("invalid state: not yanked but missing content")?
724 .clone();
725 let stream = self
726 .download_content_stream(registry_domain.as_ref(), &digest)
727 .await?;
728 Ok(Some((
729 PackageDownloadInfo {
730 version: release.version.clone(),
731 digest,
732 },
733 stream,
734 )))
735 }
736 None => Ok(None),
737 }
738 }
739
740 pub async fn download_exact(
750 &self,
751 package: &PackageName,
752 version: &Version,
753 ) -> Result<PackageDownload, ClientError> {
754 let info = self.package(package).await?;
755
756 let registry_domain = self.get_warg_registry(package.namespace()).await?;
757
758 tracing::debug!(
759 package = package.as_ref(),
760 version = version.to_string(),
761 registry_header = ?registry_domain,
762 "downloading exact version",
763 );
764
765 let release =
766 info.state
767 .release(version)
768 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
769 version: version.clone(),
770 name: package.clone(),
771 })?;
772
773 let digest = release
774 .content()
775 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
776 version: version.clone(),
777 name: package.clone(),
778 })?;
779
780 Ok(PackageDownload {
781 version: version.clone(),
782 digest: digest.clone(),
783 path: self
784 .download_content(registry_domain.as_ref(), digest)
785 .await?,
786 })
787 }
788
789 pub async fn download_exact_as_stream(
796 &self,
797 package: &PackageName,
798 version: &Version,
799 ) -> Result<(PackageDownloadInfo, impl Stream<Item = Result<Bytes>>), ClientError> {
800 let info = self.package(package).await?;
801
802 let registry_domain = self.get_warg_registry(package.namespace()).await?;
803
804 tracing::debug!(
805 package = package.as_ref(),
806 version = version.to_string(),
807 registry_header = ?registry_domain,
808 "downloading exact version",
809 );
810
811 let release =
812 info.state
813 .release(version)
814 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
815 version: version.clone(),
816 name: package.clone(),
817 })?;
818
819 let digest = release
820 .content()
821 .ok_or_else(|| ClientError::PackageVersionDoesNotExist {
822 version: version.clone(),
823 name: package.clone(),
824 })?;
825
826 Ok((
827 PackageDownloadInfo {
828 version: version.clone(),
829 digest: digest.clone(),
830 },
831 self.download_content_stream(registry_domain.as_ref(), digest)
832 .await?,
833 ))
834 }
835
836 async fn update_packages_and_return_federated_packages<'a>(
837 &self,
838 registry_domain: Option<&RegistryDomain>,
839 packages: impl IntoIterator<Item = &'a mut PackageInfo>,
840 ) -> Result<IndexMap<Option<RegistryDomain>, Vec<&'a mut PackageInfo>>, ClientError> {
841 let ts_checkpoint = self.api.latest_checkpoint(registry_domain).await?;
842 let checkpoint = &ts_checkpoint.as_ref().checkpoint;
843
844 tracing::debug!(
845 log_length = checkpoint.log_length,
846 registry_header = ?registry_domain,
847 "updating to checkpoint",
848 );
849
850 let mut operator = self
852 .registry
853 .load_operator(registry_domain)
854 .await?
855 .unwrap_or_default();
856
857 let mut packages = packages
859 .into_iter()
860 .filter_map(|p| match &p.checkpoint {
861 Some(c) if p.registry.is_some() && c == checkpoint => None,
864 _ => Some((LogId::package_log::<Sha256>(&p.name), p)),
865 })
866 .inspect(|(_, p)| tracing::info!("package `{name}` will be updated", name = p.name))
867 .collect::<IndexMap<_, _>>();
868
869 if operator.checkpoint.is_some_and(|c| &c == checkpoint) && packages.is_empty() {
871 return Ok(IndexMap::default());
872 }
873
874 let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
876 IndexMap::with_capacity(packages.len());
877
878 let has_auth_token = self.api.auth_token().is_some();
880 loop {
881 let response = match self
882 .api
883 .fetch_logs(
884 registry_domain,
885 FetchLogsRequest {
886 log_length: checkpoint.log_length,
887 operator: operator
888 .head_fetch_token
889 .as_ref()
890 .map(|t| Cow::Borrowed(t.as_str())),
891 limit: None,
892 packages: Cow::Owned(
894 packages
895 .iter()
896 .map(|(id, p)| (id.clone(), p.head_fetch_token.clone()))
897 .collect::<IndexMap<_, _>>(),
898 ),
899 },
900 )
901 .await
902 .inspect(|res| {
903 for warning in res.warnings.iter() {
904 tracing::warn!("Fetch warning from registry: {}", warning.message);
905 }
906 }) {
907 Ok(res) => Ok(res),
908 Err(err) => match &err {
909 api::ClientError::Fetch(FetchError::LogNotFound(log_id))
910 | api::ClientError::Package(PackageError::LogNotFound(log_id)) => {
911 if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
912 Err(ClientError::PackageDoesNotExist {
913 name,
914 has_auth_token,
915 })
916 } else {
917 Err(ClientError::Api(err))
918 }
919 }
920
921 api::ClientError::LogNotFoundWithHint(log_id, hint)
922 if self.disable_interactive =>
923 {
924 let name = packages.get(log_id).unwrap().name.clone();
925
926 match hint.to_str().ok().map(|s| s.split_once('=')) {
927 Some(Some((namespace, registry))) if packages.contains_key(log_id) => {
928 Err(ClientError::PackageDoesNotExistWithHintHeader {
929 name,
930 has_auth_token,
931 hint_namespace: namespace.to_string(),
932 hint_registry: registry.to_string(),
933 })
934 }
935 _ => Err(ClientError::PackageDoesNotExist {
936 name,
937 has_auth_token,
938 }),
939 }
940 }
941
942 #[cfg(feature = "cli-interactive")]
943 api::ClientError::LogNotFoundWithHint(log_id, hint) => {
944 match hint.to_str().ok().map(|s| s.split_once('=')) {
945 Some(Some((namespace, registry)))
946 if !self.ignore_federation_hints
947 && packages.contains_key(log_id) =>
948 {
949 use dialoguer::{theme::ColorfulTheme, Confirm};
950
951 let package_name = &packages.get(log_id).unwrap().name;
952
953 if !self.disable_auto_accept_federation_hints
954 || Confirm::with_theme(&ColorfulTheme::default())
955 .with_prompt(format!(
956"Package `{package_name}` is not in `{current_registry}` registry.
957Registry recommends using `{registry}` registry for packages in `{namespace}` namespace.
958Accept recommendation y/N\n",
959current_registry = registry_domain.map(|d| d.as_str()).unwrap_or(&self.url().safe_label()),
960))
961 .default(true)
962 .interact()
963 .unwrap()
964 {
965 let federated_registry_domain =
966 Some(RegistryDomain::from_str(registry)?);
967 self.store_namespace(
968 namespace.to_string(),
969 federated_registry_domain.clone().unwrap(),
970 )
971 .await?;
972
973 packages = packages
975 .into_iter()
976 .filter_map(|(log_id, package_info)| {
977 if package_info.name.namespace() == namespace {
978 if let Some(package_set) = federated_packages
979 .get_mut(&federated_registry_domain)
980 {
981 package_set.push(package_info);
982 } else {
983 federated_packages.insert(
984 federated_registry_domain.clone(),
985 vec![package_info],
986 );
987 }
988
989 None
990 } else {
991 Some((log_id, package_info))
992 }
993 })
994 .collect();
995
996 continue;
998 } else {
999 Err(ClientError::PackageDoesNotExist {
1000 name: package_name.clone(),
1001 has_auth_token,
1002 })
1003 }
1004 }
1005 _ => {
1006 if let Some(name) = packages.get(log_id).map(|p| p.name.clone()) {
1007 Err(ClientError::PackageDoesNotExist {
1008 name,
1009 has_auth_token,
1010 })
1011 } else {
1012 Err(ClientError::Api(err))
1013 }
1014 }
1015 }
1016 }
1017 _ => Err(ClientError::Api(err)),
1018 },
1019 }?;
1020
1021 for record in response.operator {
1022 let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
1023 record.envelope.try_into()?;
1024
1025 if operator.head_registry_index.is_none()
1027 || proto_envelope.registry_index > operator.head_registry_index.unwrap()
1028 {
1029 operator.state = operator
1030 .state
1031 .validate(&proto_envelope.envelope)
1032 .map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
1033 operator.head_registry_index = Some(proto_envelope.registry_index);
1034 operator.head_fetch_token = Some(record.fetch_token);
1035 }
1036 }
1037
1038 for (log_id, records) in response.packages {
1039 let package = packages.get_mut(&log_id).ok_or_else(|| {
1040 anyhow!("received records for unknown package log `{log_id}`")
1041 })?;
1042
1043 for record in records {
1044 let proto_envelope: PublishedProtoEnvelope<package::PackageRecord> =
1045 record.envelope.try_into()?;
1046
1047 if package.head_registry_index.is_none()
1049 || proto_envelope.registry_index > package.head_registry_index.unwrap()
1050 {
1051 let state = std::mem::take(&mut package.state);
1052 package.state =
1053 state.validate(&proto_envelope.envelope).map_err(|inner| {
1054 ClientError::PackageValidationFailed {
1055 name: package.name.clone(),
1056 inner,
1057 }
1058 })?;
1059 package.head_registry_index = Some(proto_envelope.registry_index);
1060 package.head_fetch_token = Some(record.fetch_token);
1061 }
1062 }
1063
1064 if package.state.head().is_none() {
1066 return Err(ClientError::PackageLogEmpty {
1067 name: package.name.clone(),
1068 });
1069 }
1070 }
1071
1072 if !response.more {
1073 break;
1074 }
1075 }
1076
1077 TimestampedCheckpoint::verify(
1079 operator
1080 .state
1081 .public_key(ts_checkpoint.to_owned().to_owned().key_id())
1082 .ok_or(ClientError::InvalidCheckpointKeyId {
1083 key_id: ts_checkpoint.key_id().clone(),
1084 })?,
1085 &ts_checkpoint.as_ref().encode(),
1086 ts_checkpoint.signature(),
1087 )
1088 .or(Err(ClientError::InvalidCheckpointSignature))?;
1089
1090 let mut leaf_indices = Vec::with_capacity(packages.len() + 1 );
1092 let mut leafs = Vec::with_capacity(leaf_indices.len());
1093
1094 if let Some(index) = operator.head_registry_index {
1096 leaf_indices.push(index);
1097 leafs.push(LogLeaf {
1098 log_id: LogId::operator_log::<Sha256>(),
1099 record_id: operator.state.head().as_ref().unwrap().digest.clone(),
1100 });
1101 } else {
1102 return Err(ClientError::NoOperatorRecords);
1103 }
1104
1105 for (log_id, package) in &packages {
1107 if let Some(index) = package.head_registry_index {
1108 leaf_indices.push(index);
1109 leafs.push(LogLeaf {
1110 log_id: log_id.clone(),
1111 record_id: package.state.head().as_ref().unwrap().digest.clone(),
1112 });
1113 } else {
1114 return Err(ClientError::PackageLogEmpty {
1115 name: package.name.clone(),
1116 });
1117 }
1118 }
1119
1120 if !leafs.is_empty() {
1121 self.api
1122 .prove_inclusion(
1123 registry_domain,
1124 InclusionRequest {
1125 log_length: checkpoint.log_length,
1126 leafs: leaf_indices,
1127 },
1128 checkpoint,
1129 &leafs,
1130 )
1131 .await?;
1132 }
1133
1134 if let Some(from) = self.registry.load_checkpoint(registry_domain).await? {
1135 let from_log_length = from.as_ref().checkpoint.log_length;
1136 let to_log_length = ts_checkpoint.as_ref().checkpoint.log_length;
1137
1138 match from_log_length.cmp(&to_log_length) {
1139 Ordering::Greater => {
1140 return Err(ClientError::CheckpointLogLengthRewind {
1141 from: from_log_length,
1142 to: to_log_length,
1143 });
1144 }
1145 Ordering::Less => {
1146 self.api
1147 .prove_log_consistency(
1148 registry_domain,
1149 ConsistencyRequest {
1150 from: from_log_length,
1151 to: to_log_length,
1152 },
1153 Cow::Borrowed(&from.as_ref().checkpoint.log_root),
1154 Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
1155 )
1156 .await?
1157 }
1158 Ordering::Equal => {
1159 if from.as_ref().checkpoint.log_root
1160 != ts_checkpoint.as_ref().checkpoint.log_root
1161 || from.as_ref().checkpoint.map_root
1162 != ts_checkpoint.as_ref().checkpoint.map_root
1163 {
1164 return Err(ClientError::CheckpointChangedLogRootOrMapRoot {
1165 log_length: from_log_length,
1166 });
1167 }
1168 }
1169 }
1170 }
1171
1172 operator.registry = registry_domain
1173 .cloned()
1174 .or_else(|| Some(self.url().registry_domain()));
1175 operator.checkpoint = Some(checkpoint.clone()); self.registry
1177 .store_operator(registry_domain, operator)
1178 .await?;
1179
1180 for package in packages.values_mut() {
1181 package.registry = registry_domain
1182 .cloned()
1183 .or_else(|| Some(self.url().registry_domain()));
1184 package.checkpoint = Some(checkpoint.clone()); self.registry
1186 .store_package(registry_domain, package)
1187 .await?;
1188 }
1189
1190 self.registry
1191 .store_checkpoint(registry_domain, &ts_checkpoint)
1192 .await?;
1193
1194 Ok(federated_packages)
1196 }
1197
1198 async fn update_checkpoints<'a>(
1200 &self,
1201 packages: impl IntoIterator<Item = &mut PackageInfo>,
1202 ) -> Result<(), ClientError> {
1203 let mut federated_packages: IndexMap<Option<RegistryDomain>, Vec<&mut PackageInfo>> =
1205 IndexMap::new();
1206 for package in packages.into_iter() {
1207 let registry_domain = self.get_warg_registry(package.name.namespace()).await?;
1208 if let Some(package_set) = federated_packages.get_mut(®istry_domain) {
1209 package_set.push(package);
1210 } else {
1211 federated_packages.insert(registry_domain, vec![package]);
1212 }
1213 }
1214
1215 while let Some((registry_domain, packages)) = federated_packages.pop() {
1216 for (registry_domain, packages) in self
1217 .update_packages_and_return_federated_packages(registry_domain.as_ref(), packages)
1218 .await?
1219 .into_iter()
1220 {
1221 if let Some(package_set) = federated_packages.get_mut(®istry_domain) {
1222 package_set.extend(packages);
1223 } else {
1224 federated_packages.insert(registry_domain, packages);
1225 }
1226 }
1227 }
1228
1229 Ok(())
1230 }
1231
1232 pub async fn fetch_packages(
1234 &self,
1235 names: impl IntoIterator<Item = &PackageName>,
1236 ) -> Result<Vec<PackageInfo>, ClientError> {
1237 let mut packages: Vec<PackageInfo> = names
1238 .into_iter()
1239 .map(|name| PackageInfo::new(name.clone()))
1240 .collect();
1241 self.update_checkpoints(packages.iter_mut()).await?;
1242 Ok(packages)
1243 }
1244
1245 pub async fn fetch_package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1247 let mut info = PackageInfo::new(name.clone());
1248 self.update_checkpoints([&mut info]).await?;
1249 Ok(info)
1250 }
1251
1252 pub async fn package(&self, name: &PackageName) -> Result<PackageInfo, ClientError> {
1255 let registry_domain = self.get_warg_registry(name.namespace()).await?;
1256 match self
1257 .registry
1258 .load_package(registry_domain.as_ref(), name)
1259 .await?
1260 {
1261 Some(mut info) => {
1262 tracing::info!("log for package `{name}` already exists in storage");
1263 if info.registry.is_none() {
1264 info.registry = registry_domain
1265 .clone()
1266 .or_else(|| Some(self.url().registry_domain()));
1267 }
1268 Ok(info)
1269 }
1270 None => {
1271 let mut info = PackageInfo::new(name.clone());
1272 self.update_checkpoints([&mut info]).await?;
1273 Ok(info)
1274 }
1275 }
1276 }
1277
1278 async fn get_package_record(
1279 &self,
1280 registry_domain: Option<&RegistryDomain>,
1281 package: &PackageName,
1282 log_id: &LogId,
1283 record_id: &RecordId,
1284 ) -> ClientResult<PackageRecord> {
1285 let record = self
1286 .api
1287 .get_package_record(registry_domain, log_id, record_id)
1288 .await
1289 .map_err(|e| match e {
1290 api::ClientError::Package(PackageError::Rejection(reason)) => {
1291 ClientError::PublishRejected {
1292 name: package.clone(),
1293 reason,
1294 record_id: record_id.clone(),
1295 }
1296 }
1297 e => {
1298 ClientError::translate_log_not_found(e, self.api.auth_token().is_some(), |id| {
1299 if id == log_id {
1300 Some(package.clone())
1301 } else {
1302 None
1303 }
1304 })
1305 }
1306 })?;
1307 Ok(record)
1308 }
1309
1310 async fn download_content(
1315 &self,
1316 registry_domain: Option<&RegistryDomain>,
1317 digest: &AnyHash,
1318 ) -> Result<PathBuf, ClientError> {
1319 match self.content.content_location(digest) {
1320 Some(path) => {
1321 tracing::info!("content for digest `{digest}` already exists in storage");
1322 Ok(path)
1323 }
1324 None => {
1325 self.content
1326 .store_content(
1327 Box::pin(self.api.download_content(registry_domain, digest).await?),
1328 Some(digest),
1329 )
1330 .await?;
1331
1332 self.content
1333 .content_location(digest)
1334 .ok_or_else(|| ClientError::ContentNotFound {
1335 digest: digest.clone(),
1336 })
1337 }
1338 }
1339 }
1340
1341 async fn download_content_stream(
1347 &self,
1348 registry_domain: Option<&RegistryDomain>,
1349 digest: &AnyHash,
1350 ) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
1351 match self.content.content_location(digest) {
1352 Some(path) => {
1353 tracing::info!("content for digest `{digest}` already exists in storage");
1354 let file = tokio::fs::File::open(path)
1355 .await
1356 .map_err(ClientError::IoError)?;
1357 Ok(ReaderStream::new(file).map_err(Into::into).boxed())
1358 }
1359 None => Ok(Box::pin(
1360 self.api.download_content(registry_domain, digest).await?,
1361 )),
1362 }
1363 }
1364}
1365pub type FileSystemClient =
1368 Client<FileSystemRegistryStorage, FileSystemContentStorage, FileSystemNamespaceMapStorage>;
1369
1370pub enum StorageLockResult<T> {
1372 Acquired(T),
1374 NotAcquired(PathBuf),
1376}
1377
1378impl FileSystemClient {
1379 async fn storage_paths(
1380 url: Option<&str>,
1381 config: &Config,
1382 disable_interactive: bool,
1383 ) -> Result<StoragePaths, ClientError> {
1384 let checking_url_for_well_known = RegistryUrl::new(
1385 url.or(config.home_url.as_deref())
1386 .unwrap_or(DEFAULT_REGISTRY),
1387 )?;
1388
1389 let url = if let Some(warg_url) =
1390 api::Client::new(checking_url_for_well_known.to_string(), None)?
1391 .well_known_config()
1392 .await?
1393 {
1394 if !disable_interactive && warg_url != checking_url_for_well_known {
1395 println!(
1396 "Resolved `{well_known}` to registry hosted on `{registry}`",
1397 well_known = checking_url_for_well_known.registry_domain(),
1398 registry = warg_url.registry_domain(),
1399 );
1400 }
1401 warg_url
1402 } else {
1403 RegistryUrl::new(
1404 url.or(config.home_url.as_deref())
1405 .ok_or(ClientError::NoHomeRegistryUrl)?,
1406 )?
1407 };
1408
1409 config.storage_paths_for_url(url)
1410 }
1411
1412 pub async fn try_new_with_config(
1421 registry: Option<&str>,
1422 config: &Config,
1423 mut auth_token: Option<Secret<String>>,
1424 ) -> Result<StorageLockResult<Self>, ClientError> {
1425 let disable_interactive =
1426 cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1427
1428 let StoragePaths {
1429 registry_url: url,
1430 registries_dir,
1431 content_dir,
1432 namespace_map_path,
1433 } = Self::storage_paths(registry, config, disable_interactive).await?;
1434
1435 let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1436 (config.keyring_backend.clone(), config.keys.clone())
1437 } else {
1438 (None, IndexSet::new())
1439 };
1440
1441 #[cfg(feature = "keyring")]
1442 if auth_token.is_none() && config.keyring_auth {
1443 auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1444 }
1445
1446 let (packages, content, namespace_map) = match (
1447 FileSystemRegistryStorage::try_lock(registries_dir.clone())?,
1448 FileSystemContentStorage::try_lock(content_dir.clone())?,
1449 FileSystemNamespaceMapStorage::new(namespace_map_path.clone()),
1450 ) {
1451 (Some(packages), Some(content), namespace_map) => (packages, content, namespace_map),
1452 (None, _, _) => return Ok(StorageLockResult::NotAcquired(registries_dir)),
1453 (_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)),
1454 };
1455
1456 Ok(StorageLockResult::Acquired(Self::new(
1457 url.into_url(),
1458 packages,
1459 content,
1460 namespace_map,
1461 auth_token,
1462 config.ignore_federation_hints,
1463 config.disable_auto_accept_federation_hints,
1464 config.disable_auto_package_init,
1465 disable_interactive,
1466 keyring_backend,
1467 keys,
1468 )?))
1469 }
1470
1471 pub async fn try_new_with_default_config(
1483 url: Option<&str>,
1484 ) -> Result<StorageLockResult<Self>, ClientError> {
1485 Self::try_new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None)
1486 .await
1487 }
1488
1489 pub async fn new_with_config(
1496 registry: Option<&str>,
1497 config: &Config,
1498 mut auth_token: Option<Secret<String>>,
1499 ) -> Result<Self, ClientError> {
1500 let disable_interactive =
1501 cfg!(not(feature = "cli-interactive")) || config.disable_interactive;
1502
1503 let StoragePaths {
1504 registry_url: url,
1505 registries_dir,
1506 content_dir,
1507 namespace_map_path,
1508 } = Self::storage_paths(registry, config, disable_interactive).await?;
1509
1510 let (keyring_backend, keys) = if cfg!(feature = "keyring") {
1511 (config.keyring_backend.clone(), config.keys.clone())
1512 } else {
1513 (None, IndexSet::new())
1514 };
1515
1516 #[cfg(feature = "keyring")]
1517 if auth_token.is_none() && config.keyring_auth {
1518 auth_token = crate::keyring::Keyring::from_config(config)?.get_auth_token(&url)?
1519 }
1520
1521 Self::new(
1522 url.into_url(),
1523 FileSystemRegistryStorage::lock(registries_dir)?,
1524 FileSystemContentStorage::lock(content_dir)?,
1525 FileSystemNamespaceMapStorage::new(namespace_map_path),
1526 auth_token,
1527 config.ignore_federation_hints,
1528 config.disable_auto_accept_federation_hints,
1529 config.disable_auto_package_init,
1530 disable_interactive,
1531 keyring_backend,
1532 keys,
1533 )
1534 }
1535
1536 pub async fn new_with_default_config(url: Option<&str>) -> Result<Self, ClientError> {
1546 Self::new_with_config(url, &Config::from_default_file()?.unwrap_or_default(), None).await
1547 }
1548}
1549
1550#[derive(Debug, Clone)]
1552pub struct PackageDownload {
1553 pub version: Version,
1555 pub digest: AnyHash,
1557 pub path: PathBuf,
1559}
1560
1561pub struct PackageDownloadInfo {
1563 pub version: Version,
1565 pub digest: AnyHash,
1567}
1568
1569#[derive(Debug, Error)]
1571pub enum ClientError {
1572 #[error("no home registry registry server URL is configured")]
1574 NoHomeRegistryUrl,
1575
1576 #[error("reset registry state failed")]
1578 ResettingRegistryLocalStateFailed,
1579
1580 #[error("clear content cache failed")]
1582 ClearContentCacheFailed,
1583
1584 #[error("unauthorized: {0}")]
1586 Unauthorized(String),
1587
1588 #[error("invalid checkpoint signature")]
1590 InvalidCheckpointSignature,
1591
1592 #[error("invalid checkpoint key ID `{key_id}`")]
1594 InvalidCheckpointKeyId {
1595 key_id: signing::KeyID,
1597 },
1598
1599 #[error("the server did not provide any operator records")]
1601 NoOperatorRecords,
1602
1603 #[error("operator failed validation: {inner}")]
1605 OperatorValidationFailed {
1606 inner: operator::ValidationError,
1608 },
1609
1610 #[error("package `{name}` already exists and cannot be initialized")]
1612 CannotInitializePackage {
1613 name: PackageName,
1615 init_record_id: Option<RecordId>,
1617 },
1618
1619 #[error("package `{name}` must be initialized before publishing")]
1621 MustInitializePackage {
1622 name: PackageName,
1624 has_auth_token: bool,
1626 },
1627
1628 #[error("there is no publish operation in progress")]
1630 NotPublishing,
1631
1632 #[error("package `{name}` has no records to publish")]
1634 NothingToPublish {
1635 name: PackageName,
1637 },
1638
1639 #[error("package `{name}` does not exist")]
1641 PackageDoesNotExist {
1642 name: PackageName,
1644 has_auth_token: bool,
1646 },
1647
1648 #[error("package `{name}` does not exist but the registry suggests checking registry `{hint_registry}` for packages in namespace `{hint_namespace}`")]
1650 PackageDoesNotExistWithHintHeader {
1651 name: PackageName,
1653 has_auth_token: bool,
1655 hint_namespace: String,
1657 hint_registry: String,
1659 },
1660
1661 #[error("version `{version}` of package `{name}` does not exist")]
1663 PackageVersionDoesNotExist {
1664 version: Version,
1666 name: PackageName,
1668 },
1669
1670 #[error("version that satisfies requirement `{version}` was not found for package `{name}`")]
1672 PackageVersionRequirementDoesNotExist {
1673 version: VersionReq,
1675 name: PackageName,
1677 },
1678
1679 #[error("package `{name}` failed validation: {inner}")]
1681 PackageValidationFailed {
1682 name: PackageName,
1684 inner: package::ValidationError,
1686 },
1687
1688 #[error("content with digest `{digest}` was not found in client storage")]
1690 ContentNotFound {
1691 digest: AnyHash,
1693 },
1694
1695 #[error("content with digest `{digest}` was not found expected `{expected}`")]
1697 IncorrectContent {
1698 digest: AnyHash,
1700 expected: AnyHash,
1702 },
1703
1704 #[error("package log is empty and cannot be validated")]
1706 PackageLogEmpty {
1707 name: PackageName,
1709 },
1710
1711 #[error("the publishing of package `{name}` was rejected due to: {reason}")]
1713 PublishRejected {
1714 name: PackageName,
1716 record_id: RecordId,
1718 reason: String,
1720 },
1721
1722 #[error("the publishing of package `{name}` was rejected due to conflicting pending publish of record `{pending_record_id}`")]
1724 ConflictPendingPublish {
1725 name: PackageName,
1727 record_id: RecordId,
1729 pending_record_id: RecordId,
1731 },
1732
1733 #[error("the package is still missing content after all content was uploaded")]
1735 PackageMissingContent,
1736
1737 #[error("registry rewinded checkpoints; latest checkpoint log length `{to}` is less than previously received checkpoint log length `{from}`")]
1740 CheckpointLogLengthRewind {
1741 from: RegistryLen,
1743 to: RegistryLen,
1745 },
1746
1747 #[error("registry provided a new checkpoint with the same log length `{log_length}` as previously fetched but different log root or map root")]
1750 CheckpointChangedLogRootOrMapRoot {
1751 log_length: RegistryLen,
1753 },
1754
1755 #[error(transparent)]
1757 Keyring(#[from] crate::keyring::KeyringError),
1758
1759 #[error(transparent)]
1761 Api(#[from] api::ClientError),
1762
1763 #[error("{0:?}")]
1765 Other(#[from] anyhow::Error),
1766
1767 #[error("error: {0:?}")]
1769 IoError(#[from] std::io::Error),
1770}
1771
1772impl ClientError {
1773 fn translate_log_not_found(
1774 e: api::ClientError,
1775 has_auth_token: bool,
1776 lookup: impl Fn(&LogId) -> Option<PackageName>,
1777 ) -> Self {
1778 match &e {
1779 api::ClientError::Fetch(FetchError::LogNotFound(id))
1780 | api::ClientError::Package(PackageError::LogNotFound(id)) => {
1781 if let Some(name) = lookup(id) {
1782 return Self::PackageDoesNotExist {
1783 name,
1784 has_auth_token,
1785 };
1786 }
1787 }
1788 _ => {}
1789 }
1790
1791 Self::Api(e)
1792 }
1793}
1794
1795pub type ClientResult<T> = Result<T, ClientError>;