1use std::path::PathBuf;
2
3use tracing::debug;
4use tracing::info;
5
6use crate::Res;
7use crate::flow;
8use crate::flow::push::PushResult;
9use crate::io::remote::HostConfig;
10use crate::io::remote::Remote;
11use crate::io::storage::Storage;
12use crate::lineage::InstalledPackageStatus;
13use crate::lineage::PackageLineage;
14use crate::manifest::Manifest;
15use crate::manifest::Workflow;
16use crate::paths::DomainPaths;
17use quilt_uri::Namespace;
18
19pub struct CommitOptions {
25 pub message: String,
26 pub user_meta: Option<serde_json::Value>,
27 pub workflow: Option<Workflow>,
28}
29
30#[derive(Debug)]
38pub enum PublishOutcome<P> {
39 CommittedAndPushed(P),
41 PushedOnly(P),
44}
45
46impl<P> PublishOutcome<P> {
47 pub fn push(&self) -> &P {
48 match self {
49 Self::CommittedAndPushed(p) | Self::PushedOnly(p) => p,
50 }
51 }
52}
53
54#[allow(clippy::too_many_arguments)]
70pub async fn publish_package(
71 lineage: PackageLineage,
72 manifest: &mut Manifest,
73 paths: &DomainPaths,
74 storage: &(impl Storage + Sync),
75 remote: &impl Remote,
76 working_dir: PathBuf,
77 status: InstalledPackageStatus,
78 namespace: Namespace,
79 host_config: HostConfig,
80 commit_opts: CommitOptions,
81) -> Res<PublishOutcome<PushResult>> {
82 let has_changes = !status.changes.is_empty();
83 let has_pending_commit = lineage.commit.is_some();
84
85 let (lineage, push_manifest, committed) = if has_pending_commit && !has_changes {
86 debug!("✔️ Publish: reusing pending commit, skipping commit");
87 (lineage, manifest.clone(), false)
88 } else {
89 debug!("⏳ Publish: committing local changes");
90 let (lineage, new_commit) = flow::commit(
91 lineage,
92 manifest,
93 paths,
94 storage,
95 working_dir,
96 status,
97 namespace.clone(),
98 commit_opts.message,
99 commit_opts.user_meta,
100 commit_opts.workflow,
101 )
102 .await?;
103 let committed_path = paths.installed_manifest(&namespace, &new_commit.hash);
106 let committed_manifest = Manifest::from_path(storage, &committed_path).await?;
107 debug!("✔️ Publish: commit done");
108 (lineage, committed_manifest, true)
109 };
110
111 info!("⏳ Publish: pushing revision");
112 let push = flow::push(
113 lineage,
114 push_manifest,
115 paths,
116 storage,
117 remote,
118 Some(namespace),
119 host_config,
120 )
121 .await?;
122 info!("✔️ Publish: push done");
123
124 Ok(if committed {
125 PublishOutcome::CommittedAndPushed(push)
126 } else {
127 PublishOutcome::PushedOnly(push)
128 })
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 use std::collections::BTreeMap;
136
137 use aws_sdk_s3::primitives::ByteStream;
138 use test_log::test;
139
140 use crate::fixtures;
141 use crate::io::remote::mocks::MockRemote;
142 use crate::io::storage::mocks::MockStorage;
143 use crate::lineage::Change;
144 use crate::lineage::CommitState;
145 use crate::lineage::PathState;
146 use crate::manifest::ManifestRow;
147 use quilt_uri::ManifestUri;
148 use quilt_uri::S3Uri;
149
150 fn manifest_uri(hash: &str) -> ManifestUri {
151 ManifestUri {
152 bucket: "b".to_string(),
153 namespace: ("foo", "bar").into(),
154 hash: hash.to_string(),
155 origin: None,
156 }
157 }
158
159 fn first_push_uri() -> ManifestUri {
160 manifest_uri("")
163 }
164
165 async fn seed_remote_latest(remote: &MockRemote, latest_hash: &str) -> Res {
166 remote
167 .put_object(
168 &None,
169 &S3Uri::try_from("s3://b/.quilt/named_packages/foo/bar/latest")?,
170 latest_hash.as_bytes().to_vec(),
171 )
172 .await?;
173 Ok(())
174 }
175
176 #[test(tokio::test)]
177 async fn test_publish_commits_message_only_and_pushes() -> Res {
178 let storage = MockStorage::default();
182 let remote = MockRemote::default();
183
184 let lineage = PackageLineage {
185 remote_uri: Some(first_push_uri()),
186 ..PackageLineage::default()
187 };
188 let mut manifest = Manifest::default();
189
190 let outcome = publish_package(
191 lineage,
192 &mut manifest,
193 &DomainPaths::default(),
194 &storage,
195 &remote,
196 PathBuf::default(),
197 InstalledPackageStatus::default(),
198 ("foo", "bar").into(),
199 HostConfig::default(),
200 CommitOptions {
201 message: "Custom message".to_string(),
202 user_meta: None,
203 workflow: None,
204 },
205 )
206 .await?;
207
208 let push = match &outcome {
209 PublishOutcome::CommittedAndPushed(p) => p,
210 PublishOutcome::PushedOnly(_) => {
211 panic!("expected CommittedAndPushed even without file changes");
212 }
213 };
214 assert!(push.certified_latest);
215 assert!(push.lineage.commit.is_none());
216 Ok(())
217 }
218
219 #[test(tokio::test)]
220 async fn test_publish_skips_commit_when_no_changes() -> Res {
221 let hash = fixtures::top_hash::EMPTY_NULL_TOP_HASH.to_string();
224 let lineage = PackageLineage {
225 commit: Some(CommitState {
226 timestamp: chrono::Utc::now(),
227 hash: hash.clone(),
228 prev_hashes: Vec::new(),
229 }),
230 remote_uri: Some(first_push_uri()),
231 ..PackageLineage::default()
232 };
233
234 let storage = MockStorage::default();
235 storage
236 .write_byte_stream(
237 PathBuf::from(format!(".quilt/packages/b/{hash}")),
238 ByteStream::from_static(b"foo"),
239 )
240 .await?;
241
242 let remote = MockRemote::default();
243 seed_remote_latest(&remote, &hash).await?;
244
245 let mut manifest = Manifest::default();
246 manifest.header.user_meta = Some(serde_json::Value::Null);
247
248 let outcome = publish_package(
249 lineage,
250 &mut manifest,
251 &DomainPaths::default(),
252 &storage,
253 &remote,
254 PathBuf::default(),
255 InstalledPackageStatus::default(),
256 ("foo", "bar").into(),
257 HostConfig::default(),
258 CommitOptions {
259 message: String::new(),
260 user_meta: None,
261 workflow: None,
262 },
263 )
264 .await?;
265
266 let push = match &outcome {
267 PublishOutcome::PushedOnly(p) => p,
268 PublishOutcome::CommittedAndPushed(_) => {
269 panic!("should skip commit when no changes");
270 }
271 };
272 assert!(push.certified_latest);
273 assert_eq!(push.lineage.remote()?.hash, hash);
274 Ok(())
275 }
276
277 #[test(tokio::test)]
278 async fn test_publish_push_fails_after_successful_commit() -> Res {
279 let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
281 let base_record = manifest_src.get_record(&PathBuf::from("0mb.bin")).unwrap();
282 let added = ManifestRow {
283 logical_key: PathBuf::from("foo"),
284 hash: base_record.hash.clone(),
285 size: base_record.size,
286 physical_key: base_record.physical_key.clone(),
287 ..ManifestRow::default()
288 };
289
290 let storage = MockStorage::default();
291 storage
292 .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
293 .await?;
294
295 let status = InstalledPackageStatus {
296 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
297 ..InstalledPackageStatus::default()
298 };
299
300 let lineage = PackageLineage {
301 paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
302 ..PackageLineage::default()
303 };
304
305 let remote = MockRemote::default();
306
307 let mut manifest = Manifest::default();
308
309 let err = publish_package(
310 lineage,
311 &mut manifest,
312 &DomainPaths::new(PathBuf::from("/")),
313 &storage,
314 &remote,
315 PathBuf::from("/working-dir"),
316 status,
317 ("foo", "bar").into(),
318 HostConfig::default(),
319 CommitOptions {
320 message: "published".to_string(),
321 user_meta: None,
322 workflow: None,
323 },
324 )
325 .await
326 .unwrap_err();
327 assert!(
328 err.to_string().contains("remote"),
329 "expected remote-missing error, got: {err}"
330 );
331 Ok(())
332 }
333
334 async fn setup_storages_for_commit_and_push(hash_hex: &str) -> Res<(MockStorage, MockRemote)> {
340 let storage = MockStorage::default();
341 storage
342 .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
343 .await?;
344
345 let remote = MockRemote::default();
346 let object_path = PathBuf::from(format!("/.quilt/objects/{hash_hex}"));
350 remote
351 .storage
352 .write_byte_stream(object_path, ByteStream::default())
353 .await?;
354 Ok((storage, remote))
355 }
356
357 fn first_push_lineage_with_foo() -> PackageLineage {
358 PackageLineage {
359 paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
360 remote_uri: Some(first_push_uri()),
361 ..PackageLineage::default()
362 }
363 }
364
365 fn row_from_fixture(fixture: &Manifest, source_key: &str) -> ManifestRow {
366 let base_record = fixture.get_record(&PathBuf::from(source_key)).unwrap();
367 ManifestRow {
368 logical_key: PathBuf::from("foo"),
369 hash: base_record.hash.clone(),
370 size: base_record.size,
371 physical_key: base_record.physical_key.clone(),
372 ..ManifestRow::default()
373 }
374 }
375
376 fn assert_first_push_of_foo_bar(push: &PushResult) -> Res {
384 assert!(push.certified_latest);
385 let pushed = push.lineage.remote()?;
386 assert!(
387 !pushed.hash.is_empty(),
388 "pushed manifest should have a hash"
389 );
390 assert_eq!(pushed.namespace, ("foo", "bar").into());
391 assert!(
392 push.lineage.commit.is_none(),
393 "publish should clear the pending commit after a successful push"
394 );
395 assert_eq!(
396 push.lineage.base_hash, pushed.hash,
397 "first push should pin base_hash to the uploaded revision"
398 );
399 assert_eq!(
400 push.lineage.latest_hash, pushed.hash,
401 "first push should pin latest_hash to the uploaded revision"
402 );
403 Ok(())
404 }
405
406 #[test(tokio::test)]
407 async fn test_publish_commits_and_pushes_happy_path() -> Res {
408 let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
413 let added = row_from_fixture(&manifest_src, "0mb.bin");
414
415 let (storage, remote) =
416 setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
417
418 let status = InstalledPackageStatus {
419 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
420 ..InstalledPackageStatus::default()
421 };
422
423 let mut manifest = Manifest::default();
424
425 let outcome = publish_package(
426 first_push_lineage_with_foo(),
427 &mut manifest,
428 &DomainPaths::new(PathBuf::from("/")),
429 &storage,
430 &remote,
431 PathBuf::from("/working-dir"),
432 status,
433 ("foo", "bar").into(),
434 HostConfig::default(),
435 CommitOptions {
436 message: "published".to_string(),
437 user_meta: None,
438 workflow: None,
439 },
440 )
441 .await?;
442
443 let push = match &outcome {
444 PublishOutcome::CommittedAndPushed(p) => p,
445 PublishOutcome::PushedOnly(_) => {
446 panic!("expected CommittedAndPushed, got PushedOnly");
447 }
448 };
449 assert_first_push_of_foo_bar(push)
450 }
451
452 #[test(tokio::test)]
453 async fn test_publish_modifies_file_and_pushes() -> Res {
454 let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
459 let modified = row_from_fixture(&manifest_src, "less-then-8mb.txt");
460
461 let storage = MockStorage::default();
466 storage
467 .write_byte_stream(
468 PathBuf::from("/working-dir/foo"),
469 ByteStream::from_static(fixtures::objects::less_than_8mb()),
470 )
471 .await?;
472 let remote = MockRemote::default();
473 let object_path = PathBuf::from(format!(
474 "/.quilt/objects/{}",
475 fixtures::objects::LESS_THAN_8MB_HASH_HEX
476 ));
477 remote
478 .storage
479 .write_byte_stream(
480 object_path,
481 ByteStream::from_static(fixtures::objects::less_than_8mb()),
482 )
483 .await?;
484
485 let status = InstalledPackageStatus {
486 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Modified(modified))]),
487 ..InstalledPackageStatus::default()
488 };
489
490 let mut manifest = Manifest::default();
493 manifest
494 .insert_record(row_from_fixture(&manifest_src, "0mb.bin"))
495 .await?;
496
497 let outcome = publish_package(
498 first_push_lineage_with_foo(),
499 &mut manifest,
500 &DomainPaths::new(PathBuf::from("/")),
501 &storage,
502 &remote,
503 PathBuf::from("/working-dir"),
504 status,
505 ("foo", "bar").into(),
506 HostConfig::default(),
507 CommitOptions {
508 message: "modified".to_string(),
509 user_meta: None,
510 workflow: None,
511 },
512 )
513 .await?;
514
515 let push = match &outcome {
516 PublishOutcome::CommittedAndPushed(p) => p,
517 PublishOutcome::PushedOnly(_) => {
518 panic!("expected CommittedAndPushed, got PushedOnly");
519 }
520 };
521 assert_first_push_of_foo_bar(push)
522 }
523
524 #[test(tokio::test)]
525 async fn test_publish_removes_file_and_pushes() -> Res {
526 let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
531 let existing = row_from_fixture(&manifest_src, "0mb.bin");
532
533 let (storage, remote) =
536 setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
537
538 let status = InstalledPackageStatus {
539 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Removed(existing.clone()))]),
540 ..InstalledPackageStatus::default()
541 };
542
543 let mut manifest = Manifest::default();
544 manifest.insert_record(existing).await?;
545
546 let outcome = publish_package(
547 first_push_lineage_with_foo(),
548 &mut manifest,
549 &DomainPaths::new(PathBuf::from("/")),
550 &storage,
551 &remote,
552 PathBuf::from("/working-dir"),
553 status,
554 ("foo", "bar").into(),
555 HostConfig::default(),
556 CommitOptions {
557 message: "removed".to_string(),
558 user_meta: None,
559 workflow: None,
560 },
561 )
562 .await?;
563
564 let push = match &outcome {
565 PublishOutcome::CommittedAndPushed(p) => p,
566 PublishOutcome::PushedOnly(_) => {
567 panic!("expected CommittedAndPushed, got PushedOnly");
568 }
569 };
570 assert_first_push_of_foo_bar(push)?;
571 assert!(
572 !push.lineage.paths.contains_key(&PathBuf::from("foo")),
573 "lineage.paths should no longer track the removed file"
574 );
575 Ok(())
576 }
577
578 #[test(tokio::test)]
579 async fn test_publish_with_meta_and_pushes() -> Res {
580 let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
584 let added = row_from_fixture(&manifest_src, "0mb.bin");
585
586 let (storage, remote) =
587 setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
588
589 let status = InstalledPackageStatus {
590 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
591 ..InstalledPackageStatus::default()
592 };
593
594 let mut manifest = Manifest::default();
595
596 let outcome = publish_package(
597 first_push_lineage_with_foo(),
598 &mut manifest,
599 &DomainPaths::new(PathBuf::from("/")),
600 &storage,
601 &remote,
602 PathBuf::from("/working-dir"),
603 status,
604 ("foo", "bar").into(),
605 HostConfig::default(),
606 CommitOptions {
607 message: "Lorem ipsum".to_string(),
608 user_meta: Some(serde_json::json!({"lorem": "ipsum"})),
609 workflow: None,
610 },
611 )
612 .await?;
613
614 let push = match &outcome {
615 PublishOutcome::CommittedAndPushed(p) => p,
616 PublishOutcome::PushedOnly(_) => {
617 panic!("expected CommittedAndPushed, got PushedOnly");
618 }
619 };
620 assert_first_push_of_foo_bar(push)
621 }
622}