1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use tracing::log;
5
6use crate::Error;
7use crate::Res;
8use crate::error::LoginError;
9use crate::error::PackageOpError;
10use crate::flow;
11use crate::flow::cache_remote_manifest;
12use crate::io::remote::HostConfig;
13use crate::io::remote::Remote;
14use crate::io::remote::RemoteS3;
15use crate::io::remote::resolve_workflow;
16use crate::io::storage::LocalStorage;
17use crate::io::storage::Storage;
18use crate::lineage;
19use crate::lineage::CommitState;
20use crate::lineage::InstalledPackageStatus;
21use crate::lineage::LineagePaths;
22use crate::manifest::Manifest;
23use crate::manifest::Workflow;
24use crate::paths;
25use crate::paths::copy_cached_to_installed;
26use quilt_uri::Host;
27use quilt_uri::ManifestUri;
28use quilt_uri::Namespace;
29use quilt_uri::S3Uri;
30use quilt_uri::UriError;
31
32pub struct PushOutcome {
34 pub manifest_uri: ManifestUri,
35 pub certified_latest: bool,
39}
40
41pub type PublishOutcome = flow::PublishOutcome<PushOutcome>;
45
46#[derive(Debug)]
51pub struct InstalledPackage<S: Storage = LocalStorage, R: Remote = RemoteS3> {
52 pub lineage: lineage::PackageLineageIo,
53 pub paths: paths::DomainPaths,
54 pub remote: R,
55 pub storage: S,
56 pub namespace: Namespace,
57}
58
59impl std::fmt::Display for InstalledPackage {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, r#"Installed package "{}""#, self.namespace)
62 }
63}
64
65impl<S: Storage + Sync, R: Remote> InstalledPackage<S, R> {
66 pub async fn scaffold_paths(&self) -> Res {
67 let home = self.lineage.domain_home(&self.storage).await?;
68 self.paths
69 .scaffold_for_installing(&self.storage, &home, &self.namespace)
70 .await
71 }
72
73 pub async fn scaffold_paths_for_caching(&self, bucket: &str) -> Res {
74 self.paths.scaffold_for_caching(&self.storage, bucket).await
75 }
76
77 pub async fn manifest(&self) -> Res<Manifest> {
78 let (_, lineage) = self.lineage.read(&self.storage).await?;
79 let hash = match lineage.current_hash() {
80 Some(h) => h,
81 None => return Ok(Manifest::default()),
82 };
83 let installed_path = self.paths.installed_manifest(&self.namespace, hash);
84 match Manifest::from_path(&self.storage, &installed_path).await {
85 Ok(manifest) => return Ok(manifest),
86
87 Err(e) => {
88 log::warn!(
89 "Failed to read installed manifest at {}: {}",
90 installed_path.display(),
91 e
92 );
93 }
94 }
95
96 match lineage.remote_uri.as_ref() {
98 Some(remote_uri) => {
99 log::info!("Attempting to recover from cache at {remote_uri}");
100 let cached_manifest =
101 cache_remote_manifest(&self.paths, &self.storage, &self.remote, remote_uri)
102 .await?;
103 copy_cached_to_installed(&self.paths, &self.storage, remote_uri).await?;
104 Ok(cached_manifest)
105 }
106 None => Err(Error::Uri(UriError::ManifestPath(
107 "No installed manifest and no remote to recover from".to_string(),
108 ))),
109 }
110 }
111
112 pub async fn lineage(&self) -> Res<lineage::PackageLineage> {
113 let (_, lineage) = self.lineage.read(&self.storage).await?;
114 Ok(lineage)
115 }
116
117 pub async fn package_home(&self) -> Res<PathBuf> {
118 self.lineage.package_home(&self.storage).await
119 }
120
121 pub async fn status(&self, host_config_opt: Option<HostConfig>) -> Res<InstalledPackageStatus> {
122 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
123
124 let lineage = match lineage.remote_uri.as_ref() {
126 Some(_) => match flow::refresh_latest_hash(lineage.clone(), &self.remote).await {
127 Ok(lineage) => lineage,
128 Err(Error::Login(LoginError::Required(_))) => {
129 return Err(Error::Login(LoginError::Required(
130 lineage.remote_uri.as_ref().and_then(|r| r.origin.clone()),
131 )));
132 }
133 Err(err) => {
134 log::warn!("Failed to refresh latest hash: {err}");
135 lineage
136 }
137 },
138 None => lineage,
139 };
140 let manifest = self.manifest().await?;
141
142 let host_config = match host_config_opt {
143 Some(hc) => hc,
144 None => match lineage.remote_uri.as_ref() {
145 Some(remote_uri) if !remote_uri.bucket.is_empty() => {
146 self.remote.host_config(&remote_uri.origin).await?
147 }
148 _ => HostConfig::default(),
149 },
150 };
151
152 let (lineage, status) = flow::status(
153 lineage,
154 &self.storage,
155 &manifest,
156 &package_home,
157 host_config,
158 )
159 .await?;
160 self.lineage.write(&self.storage, lineage).await?;
161 Ok(status)
162 }
163
164 pub async fn install_paths(&self, paths: &[PathBuf]) -> Res<LineagePaths> {
165 if paths.is_empty() {
166 return Ok(BTreeMap::new());
167 }
168
169 self.scaffold_paths().await?;
170
171 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
172 let remote_uri = lineage.remote()?;
173
174 self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
175
176 let mut manifest = self.manifest().await?;
177 let lineage = flow::install_paths(
178 lineage,
179 &mut manifest,
180 &self.paths,
181 package_home,
182 self.namespace.clone(),
183 &self.storage,
184 &self.remote,
185 &paths.iter().collect::<Vec<&PathBuf>>(),
186 )
187 .await?;
188 let lineage = self.lineage.write(&self.storage, lineage).await?;
189 Ok(lineage.paths)
190 }
191
192 pub async fn uninstall_paths(&self, paths: &Vec<PathBuf>) -> Res<LineagePaths> {
193 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
194 let lineage = flow::uninstall_paths(lineage, package_home, &self.storage, paths).await?;
195 let lineage = self.lineage.write(&self.storage, lineage).await?;
196 Ok(lineage.paths)
197 }
198
199 pub async fn revert_paths(&self, paths: &Vec<String>) -> Res {
200 log::debug!("revert_paths: {paths:?}");
201 unimplemented!()
202 }
203
204 pub async fn commit(
205 &self,
206 message: String,
207 user_meta: Option<serde_json::Value>,
208 workflow: Option<Workflow>,
209 host_config_opt: Option<HostConfig>,
210 ) -> Res<CommitState> {
211 self.scaffold_paths().await?;
212
213 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
214 let mut manifest = self.manifest().await?;
215
216 let host_config = match host_config_opt {
217 Some(hc) => hc,
218 None => match lineage.remote_uri.as_ref() {
219 Some(remote_uri) if !remote_uri.bucket.is_empty() => {
220 self.remote.host_config(&remote_uri.origin).await?
221 }
222 _ => HostConfig::default(),
223 },
224 };
225
226 let (lineage, status) = flow::status(
227 lineage,
228 &self.storage,
229 &manifest,
230 &package_home,
231 host_config,
232 )
233 .await?;
234
235 let (lineage, commit) = flow::commit(
236 lineage,
237 &mut manifest,
238 &self.paths,
239 &self.storage,
240 package_home,
241 status,
242 self.namespace.clone(),
243 message,
244 user_meta,
245 workflow,
246 )
247 .await?;
248 self.lineage.write(&self.storage, lineage).await?;
249 Ok(commit)
250 }
251
252 pub async fn publish(
264 &self,
265 message: String,
266 user_meta: Option<serde_json::Value>,
267 workflow: Option<Workflow>,
268 host_config_opt: Option<HostConfig>,
269 status_opt: Option<InstalledPackageStatus>,
270 ) -> Res<PublishOutcome> {
271 self.scaffold_paths().await?;
272
273 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
274 let remote_uri = match lineage.remote_uri.as_ref() {
275 Some(uri) if !uri.bucket.is_empty() => uri.clone(),
276 Some(_) => {
277 return Err(Error::PackageOp(PackageOpError::Publish(
278 "Remote bucket not set. Use set_remote first.".to_string(),
279 )));
280 }
281 None => {
282 return Err(Error::PackageOp(PackageOpError::Publish(
283 "No remote configured. Use set_remote first.".to_string(),
284 )));
285 }
286 };
287
288 self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
289
290 let mut manifest = self.manifest().await?;
291 let host_config =
292 host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
293
294 let (lineage, status) = match status_opt {
295 Some(status) => (lineage, status),
296 None => {
297 flow::status(
298 lineage,
299 &self.storage,
300 &manifest,
301 &package_home,
302 host_config.clone(),
303 )
304 .await?
305 }
306 };
307
308 let outcome = flow::publish(
309 lineage,
310 &mut manifest,
311 &self.paths,
312 &self.storage,
313 &self.remote,
314 package_home,
315 status,
316 self.namespace.clone(),
317 host_config,
318 flow::CommitOptions {
319 message,
320 user_meta,
321 workflow,
322 },
323 )
324 .await?;
325
326 let (committed, push_result) = match outcome {
327 flow::PublishOutcome::CommittedAndPushed(p) => (true, p),
328 flow::PublishOutcome::PushedOnly(p) => (false, p),
329 };
330 let certified_latest = push_result.certified_latest;
331 let lineage = self
332 .lineage
333 .write(&self.storage, push_result.lineage)
334 .await?;
335 let push = PushOutcome {
336 manifest_uri: lineage.remote()?.clone(),
337 certified_latest,
338 };
339 Ok(if committed {
340 PublishOutcome::CommittedAndPushed(push)
341 } else {
342 PublishOutcome::PushedOnly(push)
343 })
344 }
345
346 pub async fn push(&self, host_config_opt: Option<HostConfig>) -> Res<PushOutcome> {
348 self.scaffold_paths().await?;
349
350 let (_, lineage) = self.lineage.read(&self.storage).await?;
351 let remote_uri = match lineage.remote_uri.as_ref() {
352 Some(uri) if !uri.bucket.is_empty() => uri.clone(),
353 Some(_) => {
354 return Err(Error::PackageOp(PackageOpError::Push(
355 "Remote bucket not set. Use set_remote first.".to_string(),
356 )));
357 }
358 None => {
359 return Err(Error::PackageOp(PackageOpError::Push(
360 "No remote configured. Use set_remote first.".to_string(),
361 )));
362 }
363 };
364
365 if lineage.commit.is_none() {
366 return Err(Error::PackageOp(PackageOpError::Push(
367 "No commits to push".to_string(),
368 )));
369 }
370
371 self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
372
373 let manifest = self.manifest().await?;
374
375 let host_config =
376 host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
377
378 let result = flow::push(
379 lineage,
380 manifest,
381 &self.paths,
382 &self.storage,
383 &self.remote,
384 Some(self.namespace.clone()),
385 host_config,
386 )
387 .await?;
388 let certified_latest = result.certified_latest;
389 let lineage = self.lineage.write(&self.storage, result.lineage).await?;
390 Ok(PushOutcome {
391 manifest_uri: lineage.remote()?.clone(),
392 certified_latest,
393 })
394 }
395
396 pub async fn pull(&self, host_config_opt: Option<HostConfig>) -> Res<ManifestUri> {
397 self.scaffold_paths().await?;
398
399 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
400 let remote_uri = lineage.remote()?.clone();
401
402 self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
403
404 let mut manifest = self.manifest().await?;
405
406 let host_config =
407 host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
408
409 let (lineage, status) = flow::status(
410 lineage,
411 &self.storage,
412 &manifest,
413 &package_home,
414 host_config,
415 )
416 .await?;
417 let lineage = flow::pull(
418 lineage,
419 &mut manifest,
420 &self.paths,
421 &self.storage,
422 &self.remote,
423 package_home,
424 status,
425 self.namespace.clone(),
426 )
427 .await?;
428 let lineage = self.lineage.write(&self.storage, lineage).await?;
429 Ok(lineage.remote()?.clone())
430 }
431
432 pub async fn certify_latest(&self) -> Res<ManifestUri> {
433 let (_, lineage) = self.lineage.read(&self.storage).await?;
434 let latest_manifest_uri = lineage.remote()?.clone();
435 let lineage = flow::certify_latest(lineage, &self.remote, latest_manifest_uri).await?;
436 let lineage = self.lineage.write(&self.storage, lineage).await?;
437 Ok(lineage.remote()?.clone())
438 }
439
440 pub async fn reset_to_latest(&self) -> Res<ManifestUri> {
441 self.scaffold_paths().await?;
442
443 let (package_home, lineage) = self.lineage.read(&self.storage).await?;
444 let remote_uri = lineage.remote()?.clone();
445
446 self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
447
448 let mut manifest = self.manifest().await?;
449 let lineage = flow::reset_to_latest(
450 lineage,
451 &mut manifest,
452 &self.paths,
453 &self.storage,
454 &self.remote,
455 package_home,
456 self.namespace.clone(),
457 )
458 .await?;
459 let lineage = self.lineage.write(&self.storage, lineage).await?;
460 Ok(lineage.remote()?.clone())
461 }
462
463 pub async fn set_remote(&self, bucket: String, origin: Option<Host>) -> Res {
464 if bucket.is_empty() {
465 return Err(Error::PackageOp(PackageOpError::Push(
466 "Bucket cannot be empty".to_string(),
467 )));
468 }
469 let (_, mut lineage) = self.lineage.read(&self.storage).await?;
470 if let Some(existing) = &lineage.remote_uri
471 && !existing.hash.is_empty()
472 {
473 let same_remote = existing.bucket == bucket && existing.origin == origin;
474 if same_remote {
475 return Ok(());
476 }
477 return Err(Error::PackageOp(PackageOpError::Push(
478 "Cannot change remote on a package that has already been pushed".to_string(),
479 )));
480 }
481 self.remote.verify_bucket(&bucket).await?;
486 lineage.remote_uri = Some(ManifestUri {
487 origin: origin.clone(),
488 bucket: bucket.clone(),
489 namespace: self.namespace.clone(),
490 hash: String::new(),
491 });
492 self.lineage.write(&self.storage, lineage.clone()).await?;
495
496 if let Some(origin) = origin
501 && lineage.commit.is_some()
502 && let Err(err) = self.recommit_for_remote(lineage, origin, bucket).await
503 {
504 log::warn!("Remote saved but recommit failed (will retry on push): {err}");
505 }
506
507 Ok(())
508 }
509
510 async fn recommit_for_remote(
511 &self,
512 lineage: lineage::PackageLineage,
513 origin: Host,
514 bucket: String,
515 ) -> Res {
516 let host_config = self.remote.host_config(&Some(origin.clone())).await?;
517 let workflows_config_uri = S3Uri {
518 key: ".quilt/workflows/config.yml".to_string(),
519 bucket,
520 version: None,
521 };
522 let workflow =
523 resolve_workflow(&self.remote, &Some(origin), None, &workflows_config_uri).await?;
524 let manifest = self.manifest().await?;
525 let lineage = flow::recommit(
526 lineage,
527 &manifest,
528 &self.paths,
529 &self.storage,
530 self.namespace.clone(),
531 host_config,
532 workflow,
533 )
534 .await?;
535 self.lineage.write(&self.storage, lineage).await?;
536 Ok(())
537 }
538
539 pub async fn resolve_workflow(&self, workflow_id: Option<String>) -> Res<Option<Workflow>> {
540 let (_, lineage) = self.lineage.read(&self.storage).await?;
541 let remote_uri = match lineage.remote_uri.as_ref() {
542 Some(uri) if !uri.bucket.is_empty() => uri.clone(),
543 _ => return Ok(None),
544 };
545 let workflows_config_uri = S3Uri {
546 key: ".quilt/workflows/config.yml".to_string(),
547 ..S3Uri::from(remote_uri.clone())
548 };
549 resolve_workflow(
550 &self.remote,
551 &remote_uri.origin,
552 workflow_id,
553 &workflows_config_uri,
554 )
555 .await
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562
563 use test_log::test;
564
565 use aws_sdk_s3::primitives::ByteStream;
566
567 use crate::io::remote::mocks::MockRemote;
568 use crate::io::storage::StorageExt;
569 use crate::lineage::DomainLineageIo;
570 use crate::lineage::Home;
571 use crate::lineage::PackageLineageIo;
572 use crate::paths::DomainPaths;
573
574 #[test(tokio::test)]
575 async fn test_spamming_commit_writes() -> Res {
576 let (home, _temp_dir1) = Home::from_temp_dir()?;
577 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
578
579 let storage = LocalStorage::new();
580 let remote = MockRemote::default();
581 let namespace: Namespace = ("test", "history").into();
582 let test_hash = "deadbeef".to_string();
583
584 paths
585 .scaffold_for_installing(&storage, &home, &namespace)
586 .await?;
587 let lineage_json = format!(
589 r#"{{
590 "packages": {{
591 "test/history": {{
592 "commit": null,
593 "remote": {{
594 "bucket": "bucket",
595 "namespace": "test/history",
596 "hash": "{}",
597 "catalog": "test.quilt.dev"
598 }},
599 "base_hash": "{}",
600 "latest_hash": "{}",
601 "paths": {{}}
602 }}}},
603 "home": "/tmp/working_dir"
604 }}"#,
605 test_hash, "foo", "bar"
606 );
607 storage
608 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
609 .await?;
610
611 let test_manifest_path = paths.installed_manifest(&namespace, &test_hash);
613 let test_manifest = r#"{"version": "v0"}"#;
614 storage
615 .write_byte_stream(
616 &test_manifest_path,
617 ByteStream::from_static(test_manifest.as_bytes()),
618 )
619 .await?;
620
621 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
622
623 let package = InstalledPackage {
624 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
625 paths,
626 remote,
627 storage,
628 namespace,
629 };
630
631 let mut expected_hashes = Vec::new();
633 for i in 0..10 {
634 let commit = package
635 .commit(
636 format!("Commit new1 {i}"),
637 Some(serde_json::json!({ "count": i })),
638 None,
639 None,
640 )
641 .await?;
642 expected_hashes.insert(i, commit.hash);
643 }
644
645 expected_hashes.pop();
647
648 let commit_state = package.lineage().await?.commit.unwrap();
649
650 assert_eq!(commit_state.prev_hashes.len(), 9);
651 assert_eq!(
653 commit_state.prev_hashes,
654 expected_hashes.into_iter().rev().collect::<Vec<String>>()
655 );
656
657 Ok(())
658 }
659
660 #[test(tokio::test)]
661 async fn test_set_remote_on_local_package() -> Res {
662 let (home, _temp_dir1) = Home::from_temp_dir()?;
663 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
664
665 let storage = LocalStorage::new();
666 let remote = MockRemote::default();
667 let namespace: Namespace = ("test", "local").into();
668
669 paths
670 .scaffold_for_installing(&storage, &home, &namespace)
671 .await?;
672
673 let lineage_json = r#"{
674 "packages": {
675 "test/local": {
676 "commit": null,
677 "remote": null,
678 "base_hash": "",
679 "latest_hash": "",
680 "paths": {}
681 }
682 },
683 "home": "/tmp/working_dir"
684 }"#;
685 storage
686 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
687 .await?;
688
689 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
690 let package = InstalledPackage {
691 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
692 paths,
693 remote,
694 storage,
695 namespace,
696 };
697
698 package
699 .set_remote("my-bucket".to_string(), Some("example.com".parse()?))
700 .await?;
701
702 let lineage = package.lineage().await?;
703 let remote_uri = lineage
704 .remote_uri
705 .as_ref()
706 .expect("remote_uri should be set");
707 assert_eq!(
708 remote_uri.origin.as_ref().unwrap().to_string(),
709 "example.com"
710 );
711 assert_eq!(remote_uri.bucket, "my-bucket");
712 assert_eq!(remote_uri.hash, "");
713
714 Ok(())
715 }
716
717 #[test(tokio::test)]
718 async fn test_set_remote_empty_bucket_error() -> Res {
719 let (home, _temp_dir1) = Home::from_temp_dir()?;
720 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
721
722 let storage = LocalStorage::new();
723 let remote = MockRemote::default();
724 let namespace: Namespace = ("test", "local").into();
725
726 paths
727 .scaffold_for_installing(&storage, &home, &namespace)
728 .await?;
729
730 let lineage_json = r#"{
731 "packages": {
732 "test/local": {
733 "commit": null,
734 "remote": null,
735 "base_hash": "",
736 "latest_hash": "",
737 "paths": {}
738 }
739 },
740 "home": "/tmp/working_dir"
741 }"#;
742 storage
743 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
744 .await?;
745
746 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
747 let package = InstalledPackage {
748 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
749 paths,
750 remote,
751 storage,
752 namespace,
753 };
754
755 let result = package
756 .set_remote(String::new(), Some("example.com".parse()?))
757 .await;
758
759 assert!(result.is_err());
760 assert!(
761 result
762 .unwrap_err()
763 .to_string()
764 .contains("Bucket cannot be empty"),
765 "Error should mention empty bucket"
766 );
767
768 Ok(())
769 }
770
771 #[test(tokio::test)]
772 async fn test_set_remote_rejects_unreachable_bucket() -> Res {
773 use crate::error::RemoteCatalogError;
774
775 struct BadBucketRemote;
778
779 impl Remote for BadBucketRemote {
780 async fn exists(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<bool> {
781 unreachable!("test only exercises verify_bucket")
782 }
783 async fn get_object_stream(
784 &self,
785 _host: &Option<Host>,
786 _s3_uri: &S3Uri,
787 ) -> Res<crate::io::remote::RemoteObjectStream> {
788 unreachable!("test only exercises verify_bucket")
789 }
790 async fn resolve_url(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<S3Uri> {
791 unreachable!("test only exercises verify_bucket")
792 }
793 async fn put_object(
794 &self,
795 _host: &Option<Host>,
796 _s3_uri: &S3Uri,
797 _contents: impl Into<aws_sdk_s3::primitives::ByteStream>,
798 ) -> Res {
799 unreachable!("test only exercises verify_bucket")
800 }
801 async fn upload_file(
802 &self,
803 _host_config: &crate::io::remote::HostConfig,
804 _source_path: impl AsRef<std::path::Path>,
805 _dest_uri: &S3Uri,
806 _size: u64,
807 ) -> Res<(S3Uri, crate::checksum::ObjectHash)> {
808 unreachable!("test only exercises verify_bucket")
809 }
810 async fn host_config(
811 &self,
812 _host: &Option<Host>,
813 ) -> Res<crate::io::remote::HostConfig> {
814 Ok(crate::io::remote::HostConfig::default())
815 }
816 async fn verify_bucket(&self, bucket: &str) -> Res {
817 Err(RemoteCatalogError::BucketUnreachable(bucket.to_string()).into())
818 }
819 }
820
821 let (home, _temp_dir1) = Home::from_temp_dir()?;
822 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
823
824 let storage = LocalStorage::new();
825 let namespace: Namespace = ("test", "badbucket").into();
826
827 paths
828 .scaffold_for_installing(&storage, &home, &namespace)
829 .await?;
830
831 let lineage_json = r#"{
832 "packages": {
833 "test/badbucket": {
834 "commit": null,
835 "remote": null,
836 "base_hash": "",
837 "latest_hash": "",
838 "paths": {}
839 }
840 },
841 "home": "/tmp/working_dir"
842 }"#;
843 storage
844 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
845 .await?;
846
847 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
848 let package = InstalledPackage {
849 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
850 paths,
851 remote: BadBucketRemote,
852 storage,
853 namespace,
854 };
855
856 let result = package
857 .set_remote("typo-bucket".to_string(), Some("example.com".parse()?))
858 .await;
859
860 assert!(result.is_err());
861 let msg = result.unwrap_err().to_string();
862 assert!(
863 msg.contains("typo-bucket") && msg.contains("not reachable"),
864 "error should name the bucket and say it's unreachable, got: {msg}"
865 );
866
867 let lineage = package.lineage().await?;
870 assert!(
871 lineage.remote_uri.is_none(),
872 "remote_uri should not be persisted when verify_bucket fails",
873 );
874
875 Ok(())
876 }
877
878 #[test(tokio::test)]
879 async fn test_set_remote_rejects_change_on_pushed_package() -> Res {
880 let (home, _temp_dir1) = Home::from_temp_dir()?;
881 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
882
883 let storage = LocalStorage::new();
884 let remote = MockRemote::default();
885 let namespace: Namespace = ("test", "overwrite").into();
886
887 paths
888 .scaffold_for_installing(&storage, &home, &namespace)
889 .await?;
890
891 let lineage_json = r#"{
892 "packages": {
893 "test/overwrite": {
894 "commit": null,
895 "remote": {
896 "bucket": "old-bucket",
897 "namespace": "test/overwrite",
898 "hash": "abc123",
899 "origin": "old.host"
900 },
901 "base_hash": "abc123",
902 "latest_hash": "abc123",
903 "paths": {}
904 }
905 },
906 "home": "/tmp/working_dir"
907 }"#;
908 storage
909 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
910 .await?;
911
912 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
913 let package = InstalledPackage {
914 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
915 paths,
916 remote,
917 storage,
918 namespace,
919 };
920
921 let result = package
922 .set_remote("new-bucket".to_string(), Some("new.host".parse()?))
923 .await;
924
925 assert!(result.is_err());
926 assert!(
927 result
928 .unwrap_err()
929 .to_string()
930 .contains("Cannot change remote"),
931 "Should reject changing remote on a pushed package"
932 );
933
934 Ok(())
935 }
936
937 #[test(tokio::test)]
938 async fn test_set_remote_is_idempotent_on_pushed_package() -> Res {
939 let (home, _temp_dir1) = Home::from_temp_dir()?;
940 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
941
942 let storage = LocalStorage::new();
943 let remote = MockRemote::default();
944 let namespace: Namespace = ("test", "idempotent").into();
945
946 paths
947 .scaffold_for_installing(&storage, &home, &namespace)
948 .await?;
949
950 let lineage_json = r#"{
951 "packages": {
952 "test/idempotent": {
953 "commit": null,
954 "remote": {
955 "bucket": "my-bucket",
956 "namespace": "test/idempotent",
957 "hash": "abc123",
958 "origin": "my.host"
959 },
960 "base_hash": "abc123",
961 "latest_hash": "abc123",
962 "paths": {}
963 }
964 },
965 "home": "/tmp/working_dir"
966 }"#;
967 storage
968 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
969 .await?;
970
971 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
972 let package = InstalledPackage {
973 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
974 paths,
975 remote,
976 storage,
977 namespace,
978 };
979
980 package
982 .set_remote("my-bucket".to_string(), Some("my.host".parse()?))
983 .await?;
984
985 let lineage = package.lineage().await?;
986 let remote_uri = lineage
987 .remote_uri
988 .as_ref()
989 .expect("remote_uri should be set");
990 assert_eq!(remote_uri.hash, "abc123", "hash should be preserved");
991
992 Ok(())
993 }
994
995 #[test(tokio::test)]
996 async fn test_set_remote_overwrites_unpushed_remote() -> Res {
997 let (home, _temp_dir1) = Home::from_temp_dir()?;
998 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
999
1000 let storage = LocalStorage::new();
1001 let remote = MockRemote::default();
1002 let namespace: Namespace = ("test", "unpushed").into();
1003
1004 paths
1005 .scaffold_for_installing(&storage, &home, &namespace)
1006 .await?;
1007
1008 let lineage_json = r#"{
1009 "packages": {
1010 "test/unpushed": {
1011 "commit": null,
1012 "remote": {
1013 "bucket": "old-bucket",
1014 "namespace": "test/unpushed",
1015 "hash": "",
1016 "origin": "old.host"
1017 },
1018 "base_hash": "",
1019 "latest_hash": "",
1020 "paths": {}
1021 }
1022 },
1023 "home": "/tmp/working_dir"
1024 }"#;
1025 storage
1026 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1027 .await?;
1028
1029 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1030 let package = InstalledPackage {
1031 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1032 paths,
1033 remote,
1034 storage,
1035 namespace,
1036 };
1037
1038 package
1039 .set_remote("new-bucket".to_string(), Some("new.host".parse()?))
1040 .await?;
1041
1042 let lineage = package.lineage().await?;
1043 let remote_uri = lineage
1044 .remote_uri
1045 .as_ref()
1046 .expect("remote_uri should be set");
1047 assert_eq!(remote_uri.origin.as_ref().unwrap().to_string(), "new.host");
1048 assert_eq!(remote_uri.bucket, "new-bucket");
1049 assert_eq!(remote_uri.hash, "", "hash should remain empty");
1050
1051 Ok(())
1052 }
1053
1054 #[test(tokio::test)]
1055 async fn test_manifest_recovery_from_corruption() -> Res {
1056 let (home, _temp_dir1) = Home::from_temp_dir()?;
1057 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1058
1059 let storage = LocalStorage::new();
1060 let remote = MockRemote::default();
1061 let namespace: Namespace = ("test", "recovery").into();
1062 let test_hash = "deadbeef".to_string();
1063
1064 paths
1065 .scaffold_for_installing(&storage, &home, &namespace)
1066 .await?;
1067 paths.scaffold_for_caching(&storage, "test-bucket").await?;
1068
1069 let lineage_json = format!(
1071 r#"{{
1072 "packages": {{
1073 "test/recovery": {{
1074 "commit": null,
1075 "remote": {{
1076 "bucket": "test-bucket",
1077 "namespace": "test/recovery",
1078 "hash": "{}",
1079 "catalog": null
1080 }},
1081 "base_hash": "{}",
1082 "latest_hash": "{}",
1083 "paths": {{}}
1084 }}}},
1085 "home": "/tmp/working_dir"
1086 }}"#,
1087 test_hash, "foo", "bar"
1088 );
1089 storage
1090 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1091 .await?;
1092
1093 let reference_manifest = crate::fixtures::manifest::path();
1095 let manifest_uri = ManifestUri {
1096 bucket: "test-bucket".to_string(),
1097 namespace: namespace.clone(),
1098 hash: test_hash.clone(),
1099 origin: None,
1100 };
1101 let cached_manifest = paths.cached_manifest(&manifest_uri);
1102 storage.copy(reference_manifest?, cached_manifest).await?;
1103
1104 let installed_manifest = paths.installed_manifest(&namespace, &test_hash);
1106 storage
1107 .write_byte_stream(
1108 &installed_manifest,
1109 ByteStream::from_static(b"corrupted data"),
1110 )
1111 .await?;
1112
1113 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1114 let package = InstalledPackage {
1115 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1116 paths,
1117 remote,
1118 storage: storage.clone(),
1119 namespace,
1120 };
1121
1122 let result = package.manifest().await;
1124 assert!(
1125 result.is_ok(),
1126 "Should recover from cache when installed is corrupted"
1127 );
1128
1129 let fixed_manifest_content = storage.read_bytes(&installed_manifest).await?;
1131 assert!(
1132 fixed_manifest_content.len() > 10,
1133 "Installed manifest should be fixed"
1134 );
1135 assert!(
1136 !fixed_manifest_content.starts_with(b"corrupted"),
1137 "Should no longer be corrupted"
1138 );
1139
1140 Ok(())
1141 }
1142
1143 #[test(tokio::test)]
1144 async fn test_set_remote_recommits_existing_commit() -> Res {
1145 let (home, _temp_dir1) = Home::from_temp_dir()?;
1146 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1147
1148 let storage = LocalStorage::new();
1149 let remote = MockRemote::default();
1150 let namespace: Namespace = ("test", "recommit").into();
1151
1152 paths
1153 .scaffold_for_installing(&storage, &home, &namespace)
1154 .await?;
1155
1156 let lineage_json = r#"{
1158 "packages": {
1159 "test/recommit": {
1160 "commit": null,
1161 "remote": null,
1162 "base_hash": "",
1163 "latest_hash": "",
1164 "paths": {}
1165 }
1166 },
1167 "home": "/tmp/working_dir"
1168 }"#;
1169 storage
1170 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1171 .await?;
1172
1173 let package_home = home.join(namespace.to_string());
1175 storage.create_dir_all(&package_home).await?;
1176 storage
1177 .write_byte_stream(
1178 package_home.join("data.txt"),
1179 ByteStream::from_static(b"hello world"),
1180 )
1181 .await?;
1182
1183 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1184 let package = InstalledPackage {
1185 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1186 paths,
1187 remote,
1188 storage,
1189 namespace: namespace.clone(),
1190 };
1191
1192 let commit = package
1194 .commit(
1195 "Initial commit".to_string(),
1196 Some(serde_json::json!({"key": "value"})),
1197 None,
1198 None,
1199 )
1200 .await?;
1201 let hash_before = commit.hash.clone();
1202
1203 package
1208 .set_remote("my-bucket".to_string(), Some("example.com".parse()?))
1209 .await?;
1210
1211 let lineage = package.lineage().await?;
1212
1213 let remote_uri = lineage
1215 .remote_uri
1216 .as_ref()
1217 .expect("remote_uri should be set");
1218 assert_eq!(
1219 remote_uri.origin.as_ref().unwrap().to_string(),
1220 "example.com"
1221 );
1222 assert_eq!(remote_uri.bucket, "my-bucket");
1223
1224 let new_commit = lineage.commit.as_ref().expect("commit should exist");
1226 assert_eq!(
1227 new_commit.prev_hashes.first(),
1228 Some(&hash_before),
1229 "Old hash should be in prev_hashes after recommit"
1230 );
1231
1232 let manifest_path = package
1234 .paths
1235 .installed_manifest(&namespace, &new_commit.hash);
1236 let manifest = Manifest::from_path(&package.storage, &manifest_path).await?;
1237 assert_eq!(
1238 manifest.header.message,
1239 Some("Initial commit".to_string()),
1240 "Message should be preserved after recommit"
1241 );
1242 assert_eq!(
1243 manifest.header.user_meta,
1244 Some(serde_json::json!({"key": "value"})),
1245 "User meta should be preserved after recommit"
1246 );
1247
1248 Ok(())
1249 }
1250
1251 struct LoggedOutRemote;
1253
1254 impl crate::io::remote::Remote for LoggedOutRemote {
1255 async fn exists(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<bool> {
1256 Err(Error::Login(LoginError::Required(None)))
1257 }
1258 async fn get_object_stream(
1259 &self,
1260 _host: &Option<Host>,
1261 _s3_uri: &S3Uri,
1262 ) -> Res<crate::io::remote::RemoteObjectStream> {
1263 Err(Error::Login(LoginError::Required(None)))
1264 }
1265 async fn resolve_url(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<S3Uri> {
1266 Err(Error::Login(LoginError::Required(None)))
1267 }
1268 async fn put_object(
1269 &self,
1270 _host: &Option<Host>,
1271 _s3_uri: &S3Uri,
1272 _contents: impl Into<aws_sdk_s3::primitives::ByteStream>,
1273 ) -> Res {
1274 Err(Error::Login(LoginError::Required(None)))
1275 }
1276 async fn upload_file(
1277 &self,
1278 _host_config: &crate::io::remote::HostConfig,
1279 _source_path: impl AsRef<std::path::Path>,
1280 _dest_uri: &S3Uri,
1281 _size: u64,
1282 ) -> Res<(S3Uri, crate::checksum::ObjectHash)> {
1283 Err(Error::Login(LoginError::Required(None)))
1284 }
1285 async fn host_config(&self, _host: &Option<Host>) -> Res<crate::io::remote::HostConfig> {
1286 Ok(crate::io::remote::HostConfig::default())
1287 }
1288 async fn verify_bucket(&self, _bucket: &str) -> Res {
1289 Ok(())
1290 }
1291 }
1292
1293 #[test(tokio::test)]
1294 async fn test_status_propagates_login_required() -> Res {
1295 let (home, _temp_dir1) = Home::from_temp_dir()?;
1296 let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1297
1298 let storage = LocalStorage::new();
1299 let namespace: Namespace = ("test", "needslogin").into();
1300
1301 paths
1302 .scaffold_for_installing(&storage, &home, &namespace)
1303 .await?;
1304
1305 let lineage_json = r#"{
1307 "packages": {
1308 "test/needslogin": {
1309 "commit": null,
1310 "remote": {
1311 "bucket": "my-bucket",
1312 "namespace": "test/needslogin",
1313 "hash": "",
1314 "origin": "nightly.quilttest.com"
1315 },
1316 "base_hash": "",
1317 "latest_hash": "",
1318 "paths": {}
1319 }
1320 },
1321 "home": "/tmp/working_dir"
1322 }"#;
1323 storage
1324 .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1325 .await?;
1326
1327 let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1328 let package = InstalledPackage {
1329 lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1330 paths,
1331 remote: LoggedOutRemote,
1332 storage,
1333 namespace,
1334 };
1335
1336 let result = package.status(None).await;
1338 assert!(
1339 matches!(result, Err(Error::Login(LoginError::Required(_)))),
1340 "Expected LoginRequired error, got: {result:?}"
1341 );
1342
1343 Ok(())
1344 }
1345}