Skip to main content

quilt_rs/flow/
publish.rs

1use std::path::PathBuf;
2
3use tracing::debug;
4use tracing::info;
5
6use crate::Res;
7use crate::flow;
8use crate::flow::push::PushResult;
9use crate::io::remote::HostConfig;
10use crate::io::remote::Remote;
11use crate::io::storage::Storage;
12use crate::lineage::InstalledPackageStatus;
13use crate::lineage::PackageLineage;
14use crate::manifest::Manifest;
15use crate::manifest::Workflow;
16use crate::paths::DomainPaths;
17use quilt_uri::Namespace;
18
19/// Options passed to the commit half of [`publish_package`].
20///
21/// All fields are already resolved by the caller (template rendered,
22/// metadata parsed, workflow looked up) — the library does not know
23/// about templates or UI state.
24pub struct CommitOptions {
25    pub message: String,
26    pub user_meta: Option<serde_json::Value>,
27    pub workflow: Option<Workflow>,
28}
29
30/// Result of a successful publish — one variant per branch of the
31/// two-state decision in [`publish_package`].
32///
33/// Generic over the push payload: the flow layer returns
34/// `PublishOutcome<PushResult>`; the public API (`InstalledPackage::publish`)
35/// maps it to `PublishOutcome<PushOutcome>` via the
36/// `quilt::PublishOutcome` type alias.
37#[derive(Debug)]
38pub enum PublishOutcome<P> {
39    /// Committed pending changes, then pushed the new revision.
40    CommittedAndPushed(P),
41    /// Pushed a previously-committed revision without a new commit
42    /// (working directory had no changes).
43    PushedOnly(P),
44}
45
46impl<P> PublishOutcome<P> {
47    pub fn push(&self) -> &P {
48        match self {
49            Self::CommittedAndPushed(p) | Self::PushedOnly(p) => p,
50        }
51    }
52}
53
54/// Commit any pending working-directory changes and then push the resulting
55/// revision to the remote in one step.
56///
57/// Branches on the pre-publish state:
58///
59/// - `status.changes` empty and `lineage.commit` is `Some` → push only
60///   (reuse the unpushed prior commit; by invariant, a set `commit` is
61///   always unpushed — [`flow::push`] clears it on success)
62/// - otherwise → commit (with the caller's message/metadata/workflow),
63///   then push. `status.changes` may be empty: a message- or
64///   metadata-only revision is a legitimate user intent, and if the
65///   resulting manifest header and rows happen to match the last-pushed
66///   state the content-addressed top hash will match and push is a
67///   no-op — so repeat clicks with identical input don't produce
68///   divergent commits on the remote.
69#[allow(clippy::too_many_arguments)]
70pub async fn publish_package(
71    lineage: PackageLineage,
72    manifest: &mut Manifest,
73    paths: &DomainPaths,
74    storage: &(impl Storage + Sync),
75    remote: &impl Remote,
76    working_dir: PathBuf,
77    status: InstalledPackageStatus,
78    namespace: Namespace,
79    host_config: HostConfig,
80    commit_opts: CommitOptions,
81) -> Res<PublishOutcome<PushResult>> {
82    let has_changes = !status.changes.is_empty();
83    let has_pending_commit = lineage.commit.is_some();
84
85    let (lineage, push_manifest, committed) = if has_pending_commit && !has_changes {
86        debug!("✔️ Publish: reusing pending commit, skipping commit");
87        (lineage, manifest.clone(), false)
88    } else {
89        debug!("⏳ Publish: committing local changes");
90        let (lineage, new_commit) = flow::commit(
91            lineage,
92            manifest,
93            paths,
94            storage,
95            working_dir,
96            status,
97            namespace.clone(),
98            commit_opts.message,
99            commit_opts.user_meta,
100            commit_opts.workflow,
101        )
102        .await?;
103        // commit wrote a new manifest to disk; reload it so push uploads
104        // the new rows, not the pre-commit manifest we were handed.
105        let committed_path = paths.installed_manifest(&namespace, &new_commit.hash);
106        let committed_manifest = Manifest::from_path(storage, &committed_path).await?;
107        debug!("✔️ Publish: commit done");
108        (lineage, committed_manifest, true)
109    };
110
111    info!("⏳ Publish: pushing revision");
112    let push = flow::push(
113        lineage,
114        push_manifest,
115        paths,
116        storage,
117        remote,
118        Some(namespace),
119        host_config,
120    )
121    .await?;
122    info!("✔️ Publish: push done");
123
124    Ok(if committed {
125        PublishOutcome::CommittedAndPushed(push)
126    } else {
127        PublishOutcome::PushedOnly(push)
128    })
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    use std::collections::BTreeMap;
136
137    use aws_sdk_s3::primitives::ByteStream;
138    use test_log::test;
139
140    use crate::fixtures;
141    use crate::io::remote::mocks::MockRemote;
142    use crate::io::storage::mocks::MockStorage;
143    use crate::lineage::Change;
144    use crate::lineage::CommitState;
145    use crate::lineage::PathState;
146    use crate::manifest::ManifestRow;
147    use quilt_uri::ManifestUri;
148    use quilt_uri::S3Uri;
149
150    fn manifest_uri(hash: &str) -> ManifestUri {
151        ManifestUri {
152            bucket: "b".to_string(),
153            namespace: ("foo", "bar").into(),
154            hash: hash.to_string(),
155            origin: None,
156        }
157    }
158
159    fn first_push_uri() -> ManifestUri {
160        // Empty hash triggers the "first push" branch in push_package, so
161        // no remote manifest fetch is required to run the round trip.
162        manifest_uri("")
163    }
164
165    async fn seed_remote_latest(remote: &MockRemote, latest_hash: &str) -> Res {
166        remote
167            .put_object(
168                &None,
169                &S3Uri::try_from("s3://b/.quilt/named_packages/foo/bar/latest")?,
170                latest_hash.as_bytes().to_vec(),
171            )
172            .await?;
173        Ok(())
174    }
175
176    #[test(tokio::test)]
177    async fn test_publish_commits_message_only_and_pushes() -> Res {
178        // No working-dir changes and no pending commit, but the caller
179        // supplied a message. Publish still commits (recording a
180        // message-only revision) and pushes it.
181        let storage = MockStorage::default();
182        let remote = MockRemote::default();
183
184        let lineage = PackageLineage {
185            remote_uri: Some(first_push_uri()),
186            ..PackageLineage::default()
187        };
188        let mut manifest = Manifest::default();
189
190        let outcome = publish_package(
191            lineage,
192            &mut manifest,
193            &DomainPaths::default(),
194            &storage,
195            &remote,
196            PathBuf::default(),
197            InstalledPackageStatus::default(),
198            ("foo", "bar").into(),
199            HostConfig::default(),
200            CommitOptions {
201                message: "Custom message".to_string(),
202                user_meta: None,
203                workflow: None,
204            },
205        )
206        .await?;
207
208        let push = match &outcome {
209            PublishOutcome::CommittedAndPushed(p) => p,
210            PublishOutcome::PushedOnly(_) => {
211                panic!("expected CommittedAndPushed even without file changes");
212            }
213        };
214        assert!(push.certified_latest);
215        assert!(push.lineage.commit.is_none());
216        Ok(())
217    }
218
219    #[test(tokio::test)]
220    async fn test_publish_skips_commit_when_no_changes() -> Res {
221        // Package with a pending local commit, first push. No new
222        // working-dir changes — commit should be skipped.
223        let hash = fixtures::top_hash::EMPTY_NULL_TOP_HASH.to_string();
224        let lineage = PackageLineage {
225            commit: Some(CommitState {
226                timestamp: chrono::Utc::now(),
227                hash: hash.clone(),
228                prev_hashes: Vec::new(),
229            }),
230            remote_uri: Some(first_push_uri()),
231            ..PackageLineage::default()
232        };
233
234        let storage = MockStorage::default();
235        storage
236            .write_byte_stream(
237                PathBuf::from(format!(".quilt/packages/b/{hash}")),
238                ByteStream::from_static(b"foo"),
239            )
240            .await?;
241
242        let remote = MockRemote::default();
243        seed_remote_latest(&remote, &hash).await?;
244
245        let mut manifest = Manifest::default();
246        manifest.header.user_meta = Some(serde_json::Value::Null);
247
248        let outcome = publish_package(
249            lineage,
250            &mut manifest,
251            &DomainPaths::default(),
252            &storage,
253            &remote,
254            PathBuf::default(),
255            InstalledPackageStatus::default(),
256            ("foo", "bar").into(),
257            HostConfig::default(),
258            CommitOptions {
259                message: String::new(),
260                user_meta: None,
261                workflow: None,
262            },
263        )
264        .await?;
265
266        let push = match &outcome {
267            PublishOutcome::PushedOnly(p) => p,
268            PublishOutcome::CommittedAndPushed(_) => {
269                panic!("should skip commit when no changes");
270            }
271        };
272        assert!(push.certified_latest);
273        assert_eq!(push.lineage.remote()?.hash, hash);
274        Ok(())
275    }
276
277    #[test(tokio::test)]
278    async fn test_publish_push_fails_after_successful_commit() -> Res {
279        // Commit succeeds, but lineage has no remote — push bails out.
280        let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
281        let base_record = manifest_src.get_record(&PathBuf::from("0mb.bin")).unwrap();
282        let added = ManifestRow {
283            logical_key: PathBuf::from("foo"),
284            hash: base_record.hash.clone(),
285            size: base_record.size,
286            physical_key: base_record.physical_key.clone(),
287            ..ManifestRow::default()
288        };
289
290        let storage = MockStorage::default();
291        storage
292            .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
293            .await?;
294
295        let status = InstalledPackageStatus {
296            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
297            ..InstalledPackageStatus::default()
298        };
299
300        let lineage = PackageLineage {
301            paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
302            ..PackageLineage::default()
303        };
304
305        let remote = MockRemote::default();
306
307        let mut manifest = Manifest::default();
308
309        let err = publish_package(
310            lineage,
311            &mut manifest,
312            &DomainPaths::new(PathBuf::from("/")),
313            &storage,
314            &remote,
315            PathBuf::from("/working-dir"),
316            status,
317            ("foo", "bar").into(),
318            HostConfig::default(),
319            CommitOptions {
320                message: "published".to_string(),
321                user_meta: None,
322                workflow: None,
323            },
324        )
325        .await
326        .unwrap_err();
327        assert!(
328            err.to_string().contains("remote"),
329            "expected remote-missing error, got: {err}"
330        );
331        Ok(())
332    }
333
334    /// Shared setup for the commit-then-push publish tests.
335    ///
336    /// Seeds working-dir and remote object storage with an empty file at
337    /// `{hash_hex}`, and returns a `(storage, remote)` pair ready for use
338    /// by `publish_package` with a first-push lineage.
339    async fn setup_storages_for_commit_and_push(hash_hex: &str) -> Res<(MockStorage, MockRemote)> {
340        let storage = MockStorage::default();
341        storage
342            .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
343            .await?;
344
345        let remote = MockRemote::default();
346        // Commit rewrites the row's physical_key to file:///.quilt/objects/{hash}.
347        // Push reads that path through MockRemote's own storage
348        // (see MockRemote::upload_file), so seed the same empty file there too.
349        let object_path = PathBuf::from(format!("/.quilt/objects/{hash_hex}"));
350        remote
351            .storage
352            .write_byte_stream(object_path, ByteStream::default())
353            .await?;
354        Ok((storage, remote))
355    }
356
357    fn first_push_lineage_with_foo() -> PackageLineage {
358        PackageLineage {
359            paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
360            remote_uri: Some(first_push_uri()),
361            ..PackageLineage::default()
362        }
363    }
364
365    fn row_from_fixture(fixture: &Manifest, source_key: &str) -> ManifestRow {
366        let base_record = fixture.get_record(&PathBuf::from(source_key)).unwrap();
367        ManifestRow {
368            logical_key: PathBuf::from("foo"),
369            hash: base_record.hash.clone(),
370            size: base_record.size,
371            physical_key: base_record.physical_key.clone(),
372            ..ManifestRow::default()
373        }
374    }
375
376    /// Invariants a successful first-push revision of `foo/bar` must satisfy.
377    ///
378    /// Factored out so the four commit-and-push tests all enforce the same
379    /// post-publish state without re-listing it each time: the new manifest
380    /// hash is non-empty, push cleared the pending commit, and first-push
381    /// certification pinned both `base_hash` and `latest_hash` to the
382    /// revision we just uploaded.
383    fn assert_first_push_of_foo_bar(push: &PushResult) -> Res {
384        assert!(push.certified_latest);
385        let pushed = push.lineage.remote()?;
386        assert!(
387            !pushed.hash.is_empty(),
388            "pushed manifest should have a hash"
389        );
390        assert_eq!(pushed.namespace, ("foo", "bar").into());
391        assert!(
392            push.lineage.commit.is_none(),
393            "publish should clear the pending commit after a successful push"
394        );
395        assert_eq!(
396            push.lineage.base_hash, pushed.hash,
397            "first push should pin base_hash to the uploaded revision"
398        );
399        assert_eq!(
400            push.lineage.latest_hash, pushed.hash,
401            "first push should pin latest_hash to the uploaded revision"
402        );
403        Ok(())
404    }
405
406    #[test(tokio::test)]
407    async fn test_publish_commits_and_pushes_happy_path() -> Res {
408        // Full happy path: working-dir changes → commit succeeds → push succeeds.
409        // Mirrors the setup of `test_publish_push_fails_after_successful_commit`,
410        // but gives the lineage a first-push `remote_uri` and seeds the remote
411        // so `upload_row` / `tag_latest` can complete.
412        let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
413        let added = row_from_fixture(&manifest_src, "0mb.bin");
414
415        let (storage, remote) =
416            setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
417
418        let status = InstalledPackageStatus {
419            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
420            ..InstalledPackageStatus::default()
421        };
422
423        let mut manifest = Manifest::default();
424
425        let outcome = publish_package(
426            first_push_lineage_with_foo(),
427            &mut manifest,
428            &DomainPaths::new(PathBuf::from("/")),
429            &storage,
430            &remote,
431            PathBuf::from("/working-dir"),
432            status,
433            ("foo", "bar").into(),
434            HostConfig::default(),
435            CommitOptions {
436                message: "published".to_string(),
437                user_meta: None,
438                workflow: None,
439            },
440        )
441        .await?;
442
443        let push = match &outcome {
444            PublishOutcome::CommittedAndPushed(p) => p,
445            PublishOutcome::PushedOnly(_) => {
446                panic!("expected CommittedAndPushed, got PushedOnly");
447            }
448        };
449        assert_first_push_of_foo_bar(push)
450    }
451
452    #[test(tokio::test)]
453    async fn test_publish_modifies_file_and_pushes() -> Res {
454        // Mirrors `flow::commit::test_modifying_and_commit` on the commit
455        // side: the initial manifest has a row at "foo", and `Change::Modified`
456        // swaps its content to a new hash. The resulting revision is then
457        // pushed end-to-end.
458        let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
459        let modified = row_from_fixture(&manifest_src, "less-then-8mb.txt");
460
461        // Seed working-dir and remote objects with the *real* less-than-8mb
462        // bytes so the declared row hash matches what MockRemote computes on
463        // upload. If we seeded zero bytes here, push would compute the hash
464        // of zero bytes and the top-hash check ("local == pushed") would fail.
465        let storage = MockStorage::default();
466        storage
467            .write_byte_stream(
468                PathBuf::from("/working-dir/foo"),
469                ByteStream::from_static(fixtures::objects::less_than_8mb()),
470            )
471            .await?;
472        let remote = MockRemote::default();
473        let object_path = PathBuf::from(format!(
474            "/.quilt/objects/{}",
475            fixtures::objects::LESS_THAN_8MB_HASH_HEX
476        ));
477        remote
478            .storage
479            .write_byte_stream(
480                object_path,
481                ByteStream::from_static(fixtures::objects::less_than_8mb()),
482            )
483            .await?;
484
485        let status = InstalledPackageStatus {
486            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Modified(modified))]),
487            ..InstalledPackageStatus::default()
488        };
489
490        // Initial manifest has "foo" pointing at zero-byte content — this
491        // is the row `Change::Modified` replaces.
492        let mut manifest = Manifest::default();
493        manifest
494            .insert_record(row_from_fixture(&manifest_src, "0mb.bin"))
495            .await?;
496
497        let outcome = publish_package(
498            first_push_lineage_with_foo(),
499            &mut manifest,
500            &DomainPaths::new(PathBuf::from("/")),
501            &storage,
502            &remote,
503            PathBuf::from("/working-dir"),
504            status,
505            ("foo", "bar").into(),
506            HostConfig::default(),
507            CommitOptions {
508                message: "modified".to_string(),
509                user_meta: None,
510                workflow: None,
511            },
512        )
513        .await?;
514
515        let push = match &outcome {
516            PublishOutcome::CommittedAndPushed(p) => p,
517            PublishOutcome::PushedOnly(_) => {
518                panic!("expected CommittedAndPushed, got PushedOnly");
519            }
520        };
521        assert_first_push_of_foo_bar(push)
522    }
523
524    #[test(tokio::test)]
525    async fn test_publish_removes_file_and_pushes() -> Res {
526        // Mirrors `flow::commit::test_removing_and_commit`: initial manifest
527        // has "foo", `Change::Removed` drops it, and publish pushes the
528        // resulting empty manifest. Push uploads zero rows but still writes
529        // the manifest file and certifies it as latest.
530        let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
531        let existing = row_from_fixture(&manifest_src, "0mb.bin");
532
533        // Remove has nothing to copy into the object store, but setup still
534        // seeds /working-dir/foo so the helper stays uniform across tests.
535        let (storage, remote) =
536            setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
537
538        let status = InstalledPackageStatus {
539            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Removed(existing.clone()))]),
540            ..InstalledPackageStatus::default()
541        };
542
543        let mut manifest = Manifest::default();
544        manifest.insert_record(existing).await?;
545
546        let outcome = publish_package(
547            first_push_lineage_with_foo(),
548            &mut manifest,
549            &DomainPaths::new(PathBuf::from("/")),
550            &storage,
551            &remote,
552            PathBuf::from("/working-dir"),
553            status,
554            ("foo", "bar").into(),
555            HostConfig::default(),
556            CommitOptions {
557                message: "removed".to_string(),
558                user_meta: None,
559                workflow: None,
560            },
561        )
562        .await?;
563
564        let push = match &outcome {
565            PublishOutcome::CommittedAndPushed(p) => p,
566            PublishOutcome::PushedOnly(_) => {
567                panic!("expected CommittedAndPushed, got PushedOnly");
568            }
569        };
570        assert_first_push_of_foo_bar(push)?;
571        assert!(
572            !push.lineage.paths.contains_key(&PathBuf::from("foo")),
573            "lineage.paths should no longer track the removed file"
574        );
575        Ok(())
576    }
577
578    #[test(tokio::test)]
579    async fn test_publish_with_meta_and_pushes() -> Res {
580        // Mirrors `flow::commit::test_commit_meta` in the publish flow:
581        // a non-empty `user_meta` and commit message flow through
582        // `CommitOptions` into `flow::commit`, then push succeeds.
583        let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
584        let added = row_from_fixture(&manifest_src, "0mb.bin");
585
586        let (storage, remote) =
587            setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
588
589        let status = InstalledPackageStatus {
590            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
591            ..InstalledPackageStatus::default()
592        };
593
594        let mut manifest = Manifest::default();
595
596        let outcome = publish_package(
597            first_push_lineage_with_foo(),
598            &mut manifest,
599            &DomainPaths::new(PathBuf::from("/")),
600            &storage,
601            &remote,
602            PathBuf::from("/working-dir"),
603            status,
604            ("foo", "bar").into(),
605            HostConfig::default(),
606            CommitOptions {
607                message: "Lorem ipsum".to_string(),
608                user_meta: Some(serde_json::json!({"lorem": "ipsum"})),
609                workflow: None,
610            },
611        )
612        .await?;
613
614        let push = match &outcome {
615            PublishOutcome::CommittedAndPushed(p) => p,
616            PublishOutcome::PushedOnly(_) => {
617                panic!("expected CommittedAndPushed, got PushedOnly");
618            }
619        };
620        assert_first_push_of_foo_bar(push)
621    }
622}