Skip to main content

quilt_rs/
installed_package.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use tracing::log;
5
6use crate::Error;
7use crate::Res;
8use crate::error::LoginError;
9use crate::error::PackageOpError;
10use crate::flow;
11use crate::flow::cache_remote_manifest;
12use crate::io::remote::HostConfig;
13use crate::io::remote::Remote;
14use crate::io::remote::RemoteS3;
15use crate::io::remote::resolve_workflow;
16use crate::io::storage::LocalStorage;
17use crate::io::storage::Storage;
18use crate::lineage;
19use crate::lineage::CommitState;
20use crate::lineage::InstalledPackageStatus;
21use crate::lineage::LineagePaths;
22use crate::manifest::Manifest;
23use crate::manifest::Workflow;
24use crate::paths;
25use crate::paths::copy_cached_to_installed;
26use quilt_uri::Host;
27use quilt_uri::ManifestUri;
28use quilt_uri::Namespace;
29use quilt_uri::S3Uri;
30use quilt_uri::UriError;
31
32/// Result of a push operation visible to callers outside `quilt-rs`.
33pub struct PushOutcome {
34    pub manifest_uri: ManifestUri,
35    /// Whether the pushed revision was certified as "latest".
36    /// `false` when the remote's latest tag moved since we last checked
37    /// (i.e. someone else pushed in the meantime).
38    pub certified_latest: bool,
39}
40
41/// Result of a publish operation visible to callers outside `quilt-rs`.
42/// Alias of [`flow::PublishOutcome`] parameterized over the public
43/// [`PushOutcome`], so external callers see a non-generic type name.
44pub type PublishOutcome = flow::PublishOutcome<PushOutcome>;
45
46/// Similar to `LocalDomain` because it has access to the same lineage file and remote/storage
47/// traits.
48/// But it only manages one particular installed package.
49/// It can be instantiated from `LocalDomain` by installing new or listing existing packages.
50#[derive(Debug)]
51pub struct InstalledPackage<S: Storage = LocalStorage, R: Remote = RemoteS3> {
52    pub lineage: lineage::PackageLineageIo,
53    pub paths: paths::DomainPaths,
54    pub remote: R,
55    pub storage: S,
56    pub namespace: Namespace,
57}
58
59impl std::fmt::Display for InstalledPackage {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, r#"Installed package "{}""#, self.namespace)
62    }
63}
64
65impl<S: Storage + Sync, R: Remote> InstalledPackage<S, R> {
66    pub async fn scaffold_paths(&self) -> Res {
67        let home = self.lineage.domain_home(&self.storage).await?;
68        self.paths
69            .scaffold_for_installing(&self.storage, &home, &self.namespace)
70            .await
71    }
72
73    pub async fn scaffold_paths_for_caching(&self, bucket: &str) -> Res {
74        self.paths.scaffold_for_caching(&self.storage, bucket).await
75    }
76
77    pub async fn manifest(&self) -> Res<Manifest> {
78        let (_, lineage) = self.lineage.read(&self.storage).await?;
79        let hash = match lineage.current_hash() {
80            Some(h) => h,
81            None => return Ok(Manifest::default()),
82        };
83        let installed_path = self.paths.installed_manifest(&self.namespace, hash);
84        match Manifest::from_path(&self.storage, &installed_path).await {
85            Ok(manifest) => return Ok(manifest),
86
87            Err(e) => {
88                log::warn!(
89                    "Failed to read installed manifest at {}: {}",
90                    installed_path.display(),
91                    e
92                );
93            }
94        }
95
96        // If installed failed, try to recover from cache (only if we have a remote)
97        match lineage.remote_uri.as_ref() {
98            Some(remote_uri) => {
99                log::info!("Attempting to recover from cache at {remote_uri}");
100                let cached_manifest =
101                    cache_remote_manifest(&self.paths, &self.storage, &self.remote, remote_uri)
102                        .await?;
103                copy_cached_to_installed(&self.paths, &self.storage, remote_uri).await?;
104                Ok(cached_manifest)
105            }
106            None => Err(Error::Uri(UriError::ManifestPath(
107                "No installed manifest and no remote to recover from".to_string(),
108            ))),
109        }
110    }
111
112    pub async fn lineage(&self) -> Res<lineage::PackageLineage> {
113        let (_, lineage) = self.lineage.read(&self.storage).await?;
114        Ok(lineage)
115    }
116
117    pub async fn package_home(&self) -> Res<PathBuf> {
118        self.lineage.package_home(&self.storage).await
119    }
120
121    pub async fn status(&self, host_config_opt: Option<HostConfig>) -> Res<InstalledPackageStatus> {
122        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
123
124        // Only refresh latest hash if we have a remote
125        let lineage = match lineage.remote_uri.as_ref() {
126            Some(_) => match flow::refresh_latest_hash(lineage.clone(), &self.remote).await {
127                Ok(lineage) => lineage,
128                Err(Error::Login(LoginError::Required(_))) => {
129                    return Err(Error::Login(LoginError::Required(
130                        lineage.remote_uri.as_ref().and_then(|r| r.origin.clone()),
131                    )));
132                }
133                Err(err) => {
134                    log::warn!("Failed to refresh latest hash: {err}");
135                    lineage
136                }
137            },
138            None => lineage,
139        };
140        let manifest = self.manifest().await?;
141
142        let host_config = match host_config_opt {
143            Some(hc) => hc,
144            None => match lineage.remote_uri.as_ref() {
145                Some(remote_uri) if !remote_uri.bucket.is_empty() => {
146                    self.remote.host_config(&remote_uri.origin).await?
147                }
148                _ => HostConfig::default(),
149            },
150        };
151
152        let (lineage, status) = flow::status(
153            lineage,
154            &self.storage,
155            &manifest,
156            &package_home,
157            host_config,
158        )
159        .await?;
160        self.lineage.write(&self.storage, lineage).await?;
161        Ok(status)
162    }
163
164    pub async fn install_paths(&self, paths: &[PathBuf]) -> Res<LineagePaths> {
165        if paths.is_empty() {
166            return Ok(BTreeMap::new());
167        }
168
169        self.scaffold_paths().await?;
170
171        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
172        let remote_uri = lineage.remote()?;
173
174        self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
175
176        let mut manifest = self.manifest().await?;
177        let lineage = flow::install_paths(
178            lineage,
179            &mut manifest,
180            &self.paths,
181            package_home,
182            self.namespace.clone(),
183            &self.storage,
184            &self.remote,
185            &paths.iter().collect::<Vec<&PathBuf>>(),
186        )
187        .await?;
188        let lineage = self.lineage.write(&self.storage, lineage).await?;
189        Ok(lineage.paths)
190    }
191
192    pub async fn uninstall_paths(&self, paths: &Vec<PathBuf>) -> Res<LineagePaths> {
193        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
194        let lineage = flow::uninstall_paths(lineage, package_home, &self.storage, paths).await?;
195        let lineage = self.lineage.write(&self.storage, lineage).await?;
196        Ok(lineage.paths)
197    }
198
199    pub async fn revert_paths(&self, paths: &Vec<String>) -> Res {
200        log::debug!("revert_paths: {paths:?}");
201        unimplemented!()
202    }
203
204    pub async fn commit(
205        &self,
206        message: String,
207        user_meta: Option<serde_json::Value>,
208        workflow: Option<Workflow>,
209        host_config_opt: Option<HostConfig>,
210    ) -> Res<CommitState> {
211        self.scaffold_paths().await?;
212
213        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
214        let mut manifest = self.manifest().await?;
215
216        let host_config = match host_config_opt {
217            Some(hc) => hc,
218            None => match lineage.remote_uri.as_ref() {
219                Some(remote_uri) if !remote_uri.bucket.is_empty() => {
220                    self.remote.host_config(&remote_uri.origin).await?
221                }
222                _ => HostConfig::default(),
223            },
224        };
225
226        let (lineage, status) = flow::status(
227            lineage,
228            &self.storage,
229            &manifest,
230            &package_home,
231            host_config,
232        )
233        .await?;
234
235        let (lineage, commit) = flow::commit(
236            lineage,
237            &mut manifest,
238            &self.paths,
239            &self.storage,
240            package_home,
241            status,
242            self.namespace.clone(),
243            message,
244            user_meta,
245            workflow,
246        )
247        .await?;
248        self.lineage.write(&self.storage, lineage).await?;
249        Ok(commit)
250    }
251
252    /// Commit any working-directory changes (if any) and push the revision to
253    /// the remote in one step. Errors if the package has no remote or nothing
254    /// to publish.
255    ///
256    /// `status_opt` is a caller-provided cache of `flow::status`: when
257    /// `Some`, this method reuses it verbatim instead of re-scanning the
258    /// working tree. The caller must ensure the status was computed from the
259    /// same on-disk lineage and manifest that `publish` will re-read — i.e.
260    /// nothing else should have mutated this package between the two calls.
261    /// Passing `None` is always safe and falls back to an internal
262    /// `flow::status` call.
263    pub async fn publish(
264        &self,
265        message: String,
266        user_meta: Option<serde_json::Value>,
267        workflow: Option<Workflow>,
268        host_config_opt: Option<HostConfig>,
269        status_opt: Option<InstalledPackageStatus>,
270    ) -> Res<PublishOutcome> {
271        self.scaffold_paths().await?;
272
273        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
274        let remote_uri = match lineage.remote_uri.as_ref() {
275            Some(uri) if !uri.bucket.is_empty() => uri.clone(),
276            Some(_) => {
277                return Err(Error::PackageOp(PackageOpError::Publish(
278                    "Remote bucket not set. Use set_remote first.".to_string(),
279                )));
280            }
281            None => {
282                return Err(Error::PackageOp(PackageOpError::Publish(
283                    "No remote configured. Use set_remote first.".to_string(),
284                )));
285            }
286        };
287
288        self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
289
290        let mut manifest = self.manifest().await?;
291        let host_config =
292            host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
293
294        let (lineage, status) = match status_opt {
295            Some(status) => (lineage, status),
296            None => {
297                flow::status(
298                    lineage,
299                    &self.storage,
300                    &manifest,
301                    &package_home,
302                    host_config.clone(),
303                )
304                .await?
305            }
306        };
307
308        let outcome = flow::publish(
309            lineage,
310            &mut manifest,
311            &self.paths,
312            &self.storage,
313            &self.remote,
314            package_home,
315            status,
316            self.namespace.clone(),
317            host_config,
318            flow::CommitOptions {
319                message,
320                user_meta,
321                workflow,
322            },
323        )
324        .await?;
325
326        let (committed, push_result) = match outcome {
327            flow::PublishOutcome::CommittedAndPushed(p) => (true, p),
328            flow::PublishOutcome::PushedOnly(p) => (false, p),
329        };
330        let certified_latest = push_result.certified_latest;
331        let lineage = self
332            .lineage
333            .write(&self.storage, push_result.lineage)
334            .await?;
335        let push = PushOutcome {
336            manifest_uri: lineage.remote()?.clone(),
337            certified_latest,
338        };
339        Ok(if committed {
340            PublishOutcome::CommittedAndPushed(push)
341        } else {
342            PublishOutcome::PushedOnly(push)
343        })
344    }
345
346    /// Push the local revision to the remote.
347    pub async fn push(&self, host_config_opt: Option<HostConfig>) -> Res<PushOutcome> {
348        self.scaffold_paths().await?;
349
350        let (_, lineage) = self.lineage.read(&self.storage).await?;
351        let remote_uri = match lineage.remote_uri.as_ref() {
352            Some(uri) if !uri.bucket.is_empty() => uri.clone(),
353            Some(_) => {
354                return Err(Error::PackageOp(PackageOpError::Push(
355                    "Remote bucket not set. Use set_remote first.".to_string(),
356                )));
357            }
358            None => {
359                return Err(Error::PackageOp(PackageOpError::Push(
360                    "No remote configured. Use set_remote first.".to_string(),
361                )));
362            }
363        };
364
365        if lineage.commit.is_none() {
366            return Err(Error::PackageOp(PackageOpError::Push(
367                "No commits to push".to_string(),
368            )));
369        }
370
371        self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
372
373        let manifest = self.manifest().await?;
374
375        let host_config =
376            host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
377
378        let result = flow::push(
379            lineage,
380            manifest,
381            &self.paths,
382            &self.storage,
383            &self.remote,
384            Some(self.namespace.clone()),
385            host_config,
386        )
387        .await?;
388        let certified_latest = result.certified_latest;
389        let lineage = self.lineage.write(&self.storage, result.lineage).await?;
390        Ok(PushOutcome {
391            manifest_uri: lineage.remote()?.clone(),
392            certified_latest,
393        })
394    }
395
396    pub async fn pull(&self, host_config_opt: Option<HostConfig>) -> Res<ManifestUri> {
397        self.scaffold_paths().await?;
398
399        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
400        let remote_uri = lineage.remote()?.clone();
401
402        self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
403
404        let mut manifest = self.manifest().await?;
405
406        let host_config =
407            host_config_opt.unwrap_or(self.remote.host_config(&remote_uri.origin).await?);
408
409        let (lineage, status) = flow::status(
410            lineage,
411            &self.storage,
412            &manifest,
413            &package_home,
414            host_config,
415        )
416        .await?;
417        let lineage = flow::pull(
418            lineage,
419            &mut manifest,
420            &self.paths,
421            &self.storage,
422            &self.remote,
423            package_home,
424            status,
425            self.namespace.clone(),
426        )
427        .await?;
428        let lineage = self.lineage.write(&self.storage, lineage).await?;
429        Ok(lineage.remote()?.clone())
430    }
431
432    pub async fn certify_latest(&self) -> Res<ManifestUri> {
433        let (_, lineage) = self.lineage.read(&self.storage).await?;
434        let latest_manifest_uri = lineage.remote()?.clone();
435        let lineage = flow::certify_latest(lineage, &self.remote, latest_manifest_uri).await?;
436        let lineage = self.lineage.write(&self.storage, lineage).await?;
437        Ok(lineage.remote()?.clone())
438    }
439
440    pub async fn reset_to_latest(&self) -> Res<ManifestUri> {
441        self.scaffold_paths().await?;
442
443        let (package_home, lineage) = self.lineage.read(&self.storage).await?;
444        let remote_uri = lineage.remote()?.clone();
445
446        self.scaffold_paths_for_caching(&remote_uri.bucket).await?;
447
448        let mut manifest = self.manifest().await?;
449        let lineage = flow::reset_to_latest(
450            lineage,
451            &mut manifest,
452            &self.paths,
453            &self.storage,
454            &self.remote,
455            package_home,
456            self.namespace.clone(),
457        )
458        .await?;
459        let lineage = self.lineage.write(&self.storage, lineage).await?;
460        Ok(lineage.remote()?.clone())
461    }
462
463    pub async fn set_remote(&self, bucket: String, origin: Option<Host>) -> Res {
464        if bucket.is_empty() {
465            return Err(Error::PackageOp(PackageOpError::Push(
466                "Bucket cannot be empty".to_string(),
467            )));
468        }
469        let (_, mut lineage) = self.lineage.read(&self.storage).await?;
470        if let Some(existing) = &lineage.remote_uri
471            && !existing.hash.is_empty()
472        {
473            let same_remote = existing.bucket == bucket && existing.origin == origin;
474            if same_remote {
475                return Ok(());
476            }
477            return Err(Error::PackageOp(PackageOpError::Push(
478                "Cannot change remote on a package that has already been pushed".to_string(),
479            )));
480        }
481        // Validate the bucket up front so a typo surfaces here instead of
482        // later at push time as an opaque S3 routing error. This is an
483        // unauthenticated HEAD against s3.amazonaws.com — works even when
484        // the user hasn't logged into the catalog yet.
485        self.remote.verify_bucket(&bucket).await?;
486        lineage.remote_uri = Some(ManifestUri {
487            origin: origin.clone(),
488            bucket: bucket.clone(),
489            namespace: self.namespace.clone(),
490            hash: String::new(),
491        });
492        // Persist remote_uri first — if recommit fails (e.g. network error),
493        // the remote is still saved and the user can retry.
494        self.lineage.write(&self.storage, lineage.clone()).await?;
495
496        // Re-commit with the remote's host_config and workflow so push
497        // works immediately without a manual re-commit.
498        // This can fail (e.g. not logged in yet) — the remote is already saved,
499        // so we log a warning and let the user push after logging in.
500        if let Some(origin) = origin
501            && lineage.commit.is_some()
502            && let Err(err) = self.recommit_for_remote(lineage, origin, bucket).await
503        {
504            log::warn!("Remote saved but recommit failed (will retry on push): {err}");
505        }
506
507        Ok(())
508    }
509
510    async fn recommit_for_remote(
511        &self,
512        lineage: lineage::PackageLineage,
513        origin: Host,
514        bucket: String,
515    ) -> Res {
516        let host_config = self.remote.host_config(&Some(origin.clone())).await?;
517        let workflows_config_uri = S3Uri {
518            key: ".quilt/workflows/config.yml".to_string(),
519            bucket,
520            version: None,
521        };
522        let workflow =
523            resolve_workflow(&self.remote, &Some(origin), None, &workflows_config_uri).await?;
524        let manifest = self.manifest().await?;
525        let lineage = flow::recommit(
526            lineage,
527            &manifest,
528            &self.paths,
529            &self.storage,
530            self.namespace.clone(),
531            host_config,
532            workflow,
533        )
534        .await?;
535        self.lineage.write(&self.storage, lineage).await?;
536        Ok(())
537    }
538
539    pub async fn resolve_workflow(&self, workflow_id: Option<String>) -> Res<Option<Workflow>> {
540        let (_, lineage) = self.lineage.read(&self.storage).await?;
541        let remote_uri = match lineage.remote_uri.as_ref() {
542            Some(uri) if !uri.bucket.is_empty() => uri.clone(),
543            _ => return Ok(None),
544        };
545        let workflows_config_uri = S3Uri {
546            key: ".quilt/workflows/config.yml".to_string(),
547            ..S3Uri::from(remote_uri.clone())
548        };
549        resolve_workflow(
550            &self.remote,
551            &remote_uri.origin,
552            workflow_id,
553            &workflows_config_uri,
554        )
555        .await
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562
563    use test_log::test;
564
565    use aws_sdk_s3::primitives::ByteStream;
566
567    use crate::io::remote::mocks::MockRemote;
568    use crate::io::storage::StorageExt;
569    use crate::lineage::DomainLineageIo;
570    use crate::lineage::Home;
571    use crate::lineage::PackageLineageIo;
572    use crate::paths::DomainPaths;
573
574    #[test(tokio::test)]
575    async fn test_spamming_commit_writes() -> Res {
576        let (home, _temp_dir1) = Home::from_temp_dir()?;
577        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
578
579        let storage = LocalStorage::new();
580        let remote = MockRemote::default();
581        let namespace: Namespace = ("test", "history").into();
582        let test_hash = "deadbeef".to_string();
583
584        paths
585            .scaffold_for_installing(&storage, &home, &namespace)
586            .await?;
587        // Initialize domain lineage file
588        let lineage_json = format!(
589            r#"{{
590                "packages": {{
591                    "test/history": {{
592                        "commit": null,
593                        "remote": {{
594                            "bucket": "bucket",
595                            "namespace": "test/history",
596                            "hash": "{}",
597                            "catalog": "test.quilt.dev"
598                        }},
599                        "base_hash": "{}",
600                        "latest_hash": "{}",
601                        "paths": {{}}
602                    }}}},
603                "home": "/tmp/working_dir"
604                }}"#,
605            test_hash, "foo", "bar"
606        );
607        storage
608            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
609            .await?;
610
611        // Copy manifest to the expected path
612        let test_manifest_path = paths.installed_manifest(&namespace, &test_hash);
613        let test_manifest = r#"{"version": "v0"}"#;
614        storage
615            .write_byte_stream(
616                &test_manifest_path,
617                ByteStream::from_static(test_manifest.as_bytes()),
618            )
619            .await?;
620
621        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
622
623        let package = InstalledPackage {
624            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
625            paths,
626            remote,
627            storage,
628            namespace,
629        };
630
631        // Make 10 commits with different content
632        let mut expected_hashes = Vec::new();
633        for i in 0..10 {
634            let commit = package
635                .commit(
636                    format!("Commit new1 {i}"),
637                    Some(serde_json::json!({ "count": i })),
638                    None,
639                    None,
640                )
641                .await?;
642            expected_hashes.insert(i, commit.hash);
643        }
644
645        // Remove last, cause it's the "current" hash, not a part of `prev_hashes`
646        expected_hashes.pop();
647
648        let commit_state = package.lineage().await?.commit.unwrap();
649
650        assert_eq!(commit_state.prev_hashes.len(), 9);
651        // let hashes_to_assert: Vec<String> = expected_hashes.into_iter().rev().collect();
652        assert_eq!(
653            commit_state.prev_hashes,
654            expected_hashes.into_iter().rev().collect::<Vec<String>>()
655        );
656
657        Ok(())
658    }
659
660    #[test(tokio::test)]
661    async fn test_set_remote_on_local_package() -> Res {
662        let (home, _temp_dir1) = Home::from_temp_dir()?;
663        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
664
665        let storage = LocalStorage::new();
666        let remote = MockRemote::default();
667        let namespace: Namespace = ("test", "local").into();
668
669        paths
670            .scaffold_for_installing(&storage, &home, &namespace)
671            .await?;
672
673        let lineage_json = r#"{
674            "packages": {
675                "test/local": {
676                    "commit": null,
677                    "remote": null,
678                    "base_hash": "",
679                    "latest_hash": "",
680                    "paths": {}
681                }
682            },
683            "home": "/tmp/working_dir"
684        }"#;
685        storage
686            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
687            .await?;
688
689        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
690        let package = InstalledPackage {
691            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
692            paths,
693            remote,
694            storage,
695            namespace,
696        };
697
698        package
699            .set_remote("my-bucket".to_string(), Some("example.com".parse()?))
700            .await?;
701
702        let lineage = package.lineage().await?;
703        let remote_uri = lineage
704            .remote_uri
705            .as_ref()
706            .expect("remote_uri should be set");
707        assert_eq!(
708            remote_uri.origin.as_ref().unwrap().to_string(),
709            "example.com"
710        );
711        assert_eq!(remote_uri.bucket, "my-bucket");
712        assert_eq!(remote_uri.hash, "");
713
714        Ok(())
715    }
716
717    #[test(tokio::test)]
718    async fn test_set_remote_empty_bucket_error() -> Res {
719        let (home, _temp_dir1) = Home::from_temp_dir()?;
720        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
721
722        let storage = LocalStorage::new();
723        let remote = MockRemote::default();
724        let namespace: Namespace = ("test", "local").into();
725
726        paths
727            .scaffold_for_installing(&storage, &home, &namespace)
728            .await?;
729
730        let lineage_json = r#"{
731            "packages": {
732                "test/local": {
733                    "commit": null,
734                    "remote": null,
735                    "base_hash": "",
736                    "latest_hash": "",
737                    "paths": {}
738                }
739            },
740            "home": "/tmp/working_dir"
741        }"#;
742        storage
743            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
744            .await?;
745
746        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
747        let package = InstalledPackage {
748            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
749            paths,
750            remote,
751            storage,
752            namespace,
753        };
754
755        let result = package
756            .set_remote(String::new(), Some("example.com".parse()?))
757            .await;
758
759        assert!(result.is_err());
760        assert!(
761            result
762                .unwrap_err()
763                .to_string()
764                .contains("Bucket cannot be empty"),
765            "Error should mention empty bucket"
766        );
767
768        Ok(())
769    }
770
771    #[test(tokio::test)]
772    async fn test_set_remote_rejects_unreachable_bucket() -> Res {
773        use crate::error::RemoteCatalogError;
774
775        /// Remote that rejects any verify_bucket call — models the case
776        /// where the user typed a bucket that doesn't resolve on S3.
777        struct BadBucketRemote;
778
779        impl Remote for BadBucketRemote {
780            async fn exists(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<bool> {
781                unreachable!("test only exercises verify_bucket")
782            }
783            async fn get_object_stream(
784                &self,
785                _host: &Option<Host>,
786                _s3_uri: &S3Uri,
787            ) -> Res<crate::io::remote::RemoteObjectStream> {
788                unreachable!("test only exercises verify_bucket")
789            }
790            async fn resolve_url(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<S3Uri> {
791                unreachable!("test only exercises verify_bucket")
792            }
793            async fn put_object(
794                &self,
795                _host: &Option<Host>,
796                _s3_uri: &S3Uri,
797                _contents: impl Into<aws_sdk_s3::primitives::ByteStream>,
798            ) -> Res {
799                unreachable!("test only exercises verify_bucket")
800            }
801            async fn upload_file(
802                &self,
803                _host_config: &crate::io::remote::HostConfig,
804                _source_path: impl AsRef<std::path::Path>,
805                _dest_uri: &S3Uri,
806                _size: u64,
807            ) -> Res<(S3Uri, crate::checksum::ObjectHash)> {
808                unreachable!("test only exercises verify_bucket")
809            }
810            async fn host_config(
811                &self,
812                _host: &Option<Host>,
813            ) -> Res<crate::io::remote::HostConfig> {
814                Ok(crate::io::remote::HostConfig::default())
815            }
816            async fn verify_bucket(&self, bucket: &str) -> Res {
817                Err(RemoteCatalogError::BucketUnreachable(bucket.to_string()).into())
818            }
819        }
820
821        let (home, _temp_dir1) = Home::from_temp_dir()?;
822        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
823
824        let storage = LocalStorage::new();
825        let namespace: Namespace = ("test", "badbucket").into();
826
827        paths
828            .scaffold_for_installing(&storage, &home, &namespace)
829            .await?;
830
831        let lineage_json = r#"{
832            "packages": {
833                "test/badbucket": {
834                    "commit": null,
835                    "remote": null,
836                    "base_hash": "",
837                    "latest_hash": "",
838                    "paths": {}
839                }
840            },
841            "home": "/tmp/working_dir"
842        }"#;
843        storage
844            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
845            .await?;
846
847        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
848        let package = InstalledPackage {
849            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
850            paths,
851            remote: BadBucketRemote,
852            storage,
853            namespace,
854        };
855
856        let result = package
857            .set_remote("typo-bucket".to_string(), Some("example.com".parse()?))
858            .await;
859
860        assert!(result.is_err());
861        let msg = result.unwrap_err().to_string();
862        assert!(
863            msg.contains("typo-bucket") && msg.contains("not reachable"),
864            "error should name the bucket and say it's unreachable, got: {msg}"
865        );
866
867        // The remote must NOT have been persisted — pre-flight should fail
868        // before any lineage write.
869        let lineage = package.lineage().await?;
870        assert!(
871            lineage.remote_uri.is_none(),
872            "remote_uri should not be persisted when verify_bucket fails",
873        );
874
875        Ok(())
876    }
877
878    #[test(tokio::test)]
879    async fn test_set_remote_rejects_change_on_pushed_package() -> Res {
880        let (home, _temp_dir1) = Home::from_temp_dir()?;
881        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
882
883        let storage = LocalStorage::new();
884        let remote = MockRemote::default();
885        let namespace: Namespace = ("test", "overwrite").into();
886
887        paths
888            .scaffold_for_installing(&storage, &home, &namespace)
889            .await?;
890
891        let lineage_json = r#"{
892            "packages": {
893                "test/overwrite": {
894                    "commit": null,
895                    "remote": {
896                        "bucket": "old-bucket",
897                        "namespace": "test/overwrite",
898                        "hash": "abc123",
899                        "origin": "old.host"
900                    },
901                    "base_hash": "abc123",
902                    "latest_hash": "abc123",
903                    "paths": {}
904                }
905            },
906            "home": "/tmp/working_dir"
907        }"#;
908        storage
909            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
910            .await?;
911
912        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
913        let package = InstalledPackage {
914            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
915            paths,
916            remote,
917            storage,
918            namespace,
919        };
920
921        let result = package
922            .set_remote("new-bucket".to_string(), Some("new.host".parse()?))
923            .await;
924
925        assert!(result.is_err());
926        assert!(
927            result
928                .unwrap_err()
929                .to_string()
930                .contains("Cannot change remote"),
931            "Should reject changing remote on a pushed package"
932        );
933
934        Ok(())
935    }
936
937    #[test(tokio::test)]
938    async fn test_set_remote_is_idempotent_on_pushed_package() -> Res {
939        let (home, _temp_dir1) = Home::from_temp_dir()?;
940        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
941
942        let storage = LocalStorage::new();
943        let remote = MockRemote::default();
944        let namespace: Namespace = ("test", "idempotent").into();
945
946        paths
947            .scaffold_for_installing(&storage, &home, &namespace)
948            .await?;
949
950        let lineage_json = r#"{
951            "packages": {
952                "test/idempotent": {
953                    "commit": null,
954                    "remote": {
955                        "bucket": "my-bucket",
956                        "namespace": "test/idempotent",
957                        "hash": "abc123",
958                        "origin": "my.host"
959                    },
960                    "base_hash": "abc123",
961                    "latest_hash": "abc123",
962                    "paths": {}
963                }
964            },
965            "home": "/tmp/working_dir"
966        }"#;
967        storage
968            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
969            .await?;
970
971        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
972        let package = InstalledPackage {
973            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
974            paths,
975            remote,
976            storage,
977            namespace,
978        };
979
980        // Same bucket+origin as existing — should be a no-op
981        package
982            .set_remote("my-bucket".to_string(), Some("my.host".parse()?))
983            .await?;
984
985        let lineage = package.lineage().await?;
986        let remote_uri = lineage
987            .remote_uri
988            .as_ref()
989            .expect("remote_uri should be set");
990        assert_eq!(remote_uri.hash, "abc123", "hash should be preserved");
991
992        Ok(())
993    }
994
995    #[test(tokio::test)]
996    async fn test_set_remote_overwrites_unpushed_remote() -> Res {
997        let (home, _temp_dir1) = Home::from_temp_dir()?;
998        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
999
1000        let storage = LocalStorage::new();
1001        let remote = MockRemote::default();
1002        let namespace: Namespace = ("test", "unpushed").into();
1003
1004        paths
1005            .scaffold_for_installing(&storage, &home, &namespace)
1006            .await?;
1007
1008        let lineage_json = r#"{
1009            "packages": {
1010                "test/unpushed": {
1011                    "commit": null,
1012                    "remote": {
1013                        "bucket": "old-bucket",
1014                        "namespace": "test/unpushed",
1015                        "hash": "",
1016                        "origin": "old.host"
1017                    },
1018                    "base_hash": "",
1019                    "latest_hash": "",
1020                    "paths": {}
1021                }
1022            },
1023            "home": "/tmp/working_dir"
1024        }"#;
1025        storage
1026            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1027            .await?;
1028
1029        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1030        let package = InstalledPackage {
1031            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1032            paths,
1033            remote,
1034            storage,
1035            namespace,
1036        };
1037
1038        package
1039            .set_remote("new-bucket".to_string(), Some("new.host".parse()?))
1040            .await?;
1041
1042        let lineage = package.lineage().await?;
1043        let remote_uri = lineage
1044            .remote_uri
1045            .as_ref()
1046            .expect("remote_uri should be set");
1047        assert_eq!(remote_uri.origin.as_ref().unwrap().to_string(), "new.host");
1048        assert_eq!(remote_uri.bucket, "new-bucket");
1049        assert_eq!(remote_uri.hash, "", "hash should remain empty");
1050
1051        Ok(())
1052    }
1053
1054    #[test(tokio::test)]
1055    async fn test_manifest_recovery_from_corruption() -> Res {
1056        let (home, _temp_dir1) = Home::from_temp_dir()?;
1057        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1058
1059        let storage = LocalStorage::new();
1060        let remote = MockRemote::default();
1061        let namespace: Namespace = ("test", "recovery").into();
1062        let test_hash = "deadbeef".to_string();
1063
1064        paths
1065            .scaffold_for_installing(&storage, &home, &namespace)
1066            .await?;
1067        paths.scaffold_for_caching(&storage, "test-bucket").await?;
1068
1069        // Initialize domain lineage file
1070        let lineage_json = format!(
1071            r#"{{
1072                "packages": {{
1073                    "test/recovery": {{
1074                        "commit": null,
1075                        "remote": {{
1076                            "bucket": "test-bucket",
1077                            "namespace": "test/recovery",
1078                            "hash": "{}",
1079                            "catalog": null
1080                        }},
1081                        "base_hash": "{}",
1082                        "latest_hash": "{}",
1083                        "paths": {{}}
1084                    }}}},
1085                "home": "/tmp/working_dir"
1086                }}"#,
1087            test_hash, "foo", "bar"
1088        );
1089        storage
1090            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1091            .await?;
1092
1093        // Set up a valid cached manifest
1094        let reference_manifest = crate::fixtures::manifest::path();
1095        let manifest_uri = ManifestUri {
1096            bucket: "test-bucket".to_string(),
1097            namespace: namespace.clone(),
1098            hash: test_hash.clone(),
1099            origin: None,
1100        };
1101        let cached_manifest = paths.cached_manifest(&manifest_uri);
1102        storage.copy(reference_manifest?, cached_manifest).await?;
1103
1104        // Create a corrupted installed manifest
1105        let installed_manifest = paths.installed_manifest(&namespace, &test_hash);
1106        storage
1107            .write_byte_stream(
1108                &installed_manifest,
1109                ByteStream::from_static(b"corrupted data"),
1110            )
1111            .await?;
1112
1113        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1114        let package = InstalledPackage {
1115            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1116            paths,
1117            remote,
1118            storage: storage.clone(),
1119            namespace,
1120        };
1121
1122        // This should succeed by recovering from cache despite corrupted installed manifest
1123        let result = package.manifest().await;
1124        assert!(
1125            result.is_ok(),
1126            "Should recover from cache when installed is corrupted"
1127        );
1128
1129        // Verify the corrupted file was replaced with good data
1130        let fixed_manifest_content = storage.read_bytes(&installed_manifest).await?;
1131        assert!(
1132            fixed_manifest_content.len() > 10,
1133            "Installed manifest should be fixed"
1134        );
1135        assert!(
1136            !fixed_manifest_content.starts_with(b"corrupted"),
1137            "Should no longer be corrupted"
1138        );
1139
1140        Ok(())
1141    }
1142
1143    #[test(tokio::test)]
1144    async fn test_set_remote_recommits_existing_commit() -> Res {
1145        let (home, _temp_dir1) = Home::from_temp_dir()?;
1146        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1147
1148        let storage = LocalStorage::new();
1149        let remote = MockRemote::default();
1150        let namespace: Namespace = ("test", "recommit").into();
1151
1152        paths
1153            .scaffold_for_installing(&storage, &home, &namespace)
1154            .await?;
1155
1156        // Start with no remote and no commit
1157        let lineage_json = r#"{
1158            "packages": {
1159                "test/recommit": {
1160                    "commit": null,
1161                    "remote": null,
1162                    "base_hash": "",
1163                    "latest_hash": "",
1164                    "paths": {}
1165                }
1166            },
1167            "home": "/tmp/working_dir"
1168        }"#;
1169        storage
1170            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1171            .await?;
1172
1173        // Write a file to package home so commit has something to pick up
1174        let package_home = home.join(namespace.to_string());
1175        storage.create_dir_all(&package_home).await?;
1176        storage
1177            .write_byte_stream(
1178                package_home.join("data.txt"),
1179                ByteStream::from_static(b"hello world"),
1180            )
1181            .await?;
1182
1183        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1184        let package = InstalledPackage {
1185            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1186            paths,
1187            remote,
1188            storage,
1189            namespace: namespace.clone(),
1190        };
1191
1192        // Commit the package (no remote yet, uses default HostConfig)
1193        let commit = package
1194            .commit(
1195                "Initial commit".to_string(),
1196                Some(serde_json::json!({"key": "value"})),
1197                None,
1198                None,
1199            )
1200            .await?;
1201        let hash_before = commit.hash.clone();
1202
1203        // Now set_remote — this should trigger recommit.
1204        // MockRemote returns HostConfig::default() (SHA256 chunked), same as the
1205        // initial commit, so the row hashes stay the same. But the manifest is
1206        // rebuilt (e.g. workflow may change), and the lineage prev_hashes are updated.
1207        package
1208            .set_remote("my-bucket".to_string(), Some("example.com".parse()?))
1209            .await?;
1210
1211        let lineage = package.lineage().await?;
1212
1213        // Remote should be set
1214        let remote_uri = lineage
1215            .remote_uri
1216            .as_ref()
1217            .expect("remote_uri should be set");
1218        assert_eq!(
1219            remote_uri.origin.as_ref().unwrap().to_string(),
1220            "example.com"
1221        );
1222        assert_eq!(remote_uri.bucket, "my-bucket");
1223
1224        // Recommit should have produced a new commit
1225        let new_commit = lineage.commit.as_ref().expect("commit should exist");
1226        assert_eq!(
1227            new_commit.prev_hashes.first(),
1228            Some(&hash_before),
1229            "Old hash should be in prev_hashes after recommit"
1230        );
1231
1232        // The new manifest should be readable with preserved message and meta
1233        let manifest_path = package
1234            .paths
1235            .installed_manifest(&namespace, &new_commit.hash);
1236        let manifest = Manifest::from_path(&package.storage, &manifest_path).await?;
1237        assert_eq!(
1238            manifest.header.message,
1239            Some("Initial commit".to_string()),
1240            "Message should be preserved after recommit"
1241        );
1242        assert_eq!(
1243            manifest.header.user_meta,
1244            Some(serde_json::json!({"key": "value"})),
1245            "User meta should be preserved after recommit"
1246        );
1247
1248        Ok(())
1249    }
1250
1251    /// A remote that always returns LoginRequired, simulating a logged-out user.
1252    struct LoggedOutRemote;
1253
1254    impl crate::io::remote::Remote for LoggedOutRemote {
1255        async fn exists(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<bool> {
1256            Err(Error::Login(LoginError::Required(None)))
1257        }
1258        async fn get_object_stream(
1259            &self,
1260            _host: &Option<Host>,
1261            _s3_uri: &S3Uri,
1262        ) -> Res<crate::io::remote::RemoteObjectStream> {
1263            Err(Error::Login(LoginError::Required(None)))
1264        }
1265        async fn resolve_url(&self, _host: &Option<Host>, _s3_uri: &S3Uri) -> Res<S3Uri> {
1266            Err(Error::Login(LoginError::Required(None)))
1267        }
1268        async fn put_object(
1269            &self,
1270            _host: &Option<Host>,
1271            _s3_uri: &S3Uri,
1272            _contents: impl Into<aws_sdk_s3::primitives::ByteStream>,
1273        ) -> Res {
1274            Err(Error::Login(LoginError::Required(None)))
1275        }
1276        async fn upload_file(
1277            &self,
1278            _host_config: &crate::io::remote::HostConfig,
1279            _source_path: impl AsRef<std::path::Path>,
1280            _dest_uri: &S3Uri,
1281            _size: u64,
1282        ) -> Res<(S3Uri, crate::checksum::ObjectHash)> {
1283            Err(Error::Login(LoginError::Required(None)))
1284        }
1285        async fn host_config(&self, _host: &Option<Host>) -> Res<crate::io::remote::HostConfig> {
1286            Ok(crate::io::remote::HostConfig::default())
1287        }
1288        async fn verify_bucket(&self, _bucket: &str) -> Res {
1289            Ok(())
1290        }
1291    }
1292
1293    #[test(tokio::test)]
1294    async fn test_status_propagates_login_required() -> Res {
1295        let (home, _temp_dir1) = Home::from_temp_dir()?;
1296        let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
1297
1298        let storage = LocalStorage::new();
1299        let namespace: Namespace = ("test", "needslogin").into();
1300
1301        paths
1302            .scaffold_for_installing(&storage, &home, &namespace)
1303            .await?;
1304
1305        // Package with remote configured but never pushed (empty hash)
1306        let lineage_json = r#"{
1307            "packages": {
1308                "test/needslogin": {
1309                    "commit": null,
1310                    "remote": {
1311                        "bucket": "my-bucket",
1312                        "namespace": "test/needslogin",
1313                        "hash": "",
1314                        "origin": "nightly.quilttest.com"
1315                    },
1316                    "base_hash": "",
1317                    "latest_hash": "",
1318                    "paths": {}
1319                }
1320            },
1321            "home": "/tmp/working_dir"
1322        }"#;
1323        storage
1324            .write_byte_stream(&paths.lineage(), lineage_json.as_bytes().to_vec().into())
1325            .await?;
1326
1327        let domain_lineage_io = DomainLineageIo::new(paths.lineage());
1328        let package = InstalledPackage {
1329            lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
1330            paths,
1331            remote: LoggedOutRemote,
1332            storage,
1333            namespace,
1334        };
1335
1336        // status() should propagate LoginRequired so the UI can show a Login button
1337        let result = package.status(None).await;
1338        assert!(
1339            matches!(result, Err(Error::Login(LoginError::Required(_)))),
1340            "Expected LoginRequired error, got: {result:?}"
1341        );
1342
1343        Ok(())
1344    }
1345}