use std::path::PathBuf;
use tracing::debug;
use tracing::info;
use crate::Res;
use crate::flow;
use crate::flow::push::PushResult;
use crate::io::remote::HostConfig;
use crate::io::remote::Remote;
use crate::io::storage::Storage;
use crate::lineage::InstalledPackageStatus;
use crate::lineage::PackageLineage;
use crate::manifest::Manifest;
use crate::manifest::Workflow;
use crate::paths::DomainPaths;
use quilt_uri::Namespace;
pub struct CommitOptions {
pub message: String,
pub user_meta: Option<serde_json::Value>,
pub workflow: Option<Workflow>,
}
#[derive(Debug)]
pub enum PublishOutcome<P> {
CommittedAndPushed(P),
PushedOnly(P),
}
impl<P> PublishOutcome<P> {
pub fn push(&self) -> &P {
match self {
Self::CommittedAndPushed(p) | Self::PushedOnly(p) => p,
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn publish_package(
lineage: PackageLineage,
manifest: &mut Manifest,
paths: &DomainPaths,
storage: &(impl Storage + Sync),
remote: &impl Remote,
working_dir: PathBuf,
status: InstalledPackageStatus,
namespace: Namespace,
host_config: HostConfig,
commit_opts: CommitOptions,
) -> Res<PublishOutcome<PushResult>> {
let has_changes = !status.changes.is_empty();
let has_pending_commit = lineage.commit.is_some();
let (lineage, push_manifest, committed) = if has_pending_commit && !has_changes {
debug!("✔️ Publish: reusing pending commit, skipping commit");
(lineage, manifest.clone(), false)
} else {
debug!("⏳ Publish: committing local changes");
let (lineage, new_commit) = flow::commit(
lineage,
manifest,
paths,
storage,
working_dir,
status,
namespace.clone(),
commit_opts.message,
commit_opts.user_meta,
commit_opts.workflow,
)
.await?;
let committed_path = paths.installed_manifest(&namespace, &new_commit.hash);
let committed_manifest = Manifest::from_path(storage, &committed_path).await?;
debug!("✔️ Publish: commit done");
(lineage, committed_manifest, true)
};
info!("⏳ Publish: pushing revision");
let push = flow::push(
lineage,
push_manifest,
paths,
storage,
remote,
Some(namespace),
host_config,
)
.await?;
info!("✔️ Publish: push done");
Ok(if committed {
PublishOutcome::CommittedAndPushed(push)
} else {
PublishOutcome::PushedOnly(push)
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
use aws_sdk_s3::primitives::ByteStream;
use test_log::test;
use crate::fixtures;
use crate::io::remote::mocks::MockRemote;
use crate::io::storage::mocks::MockStorage;
use crate::lineage::Change;
use crate::lineage::CommitState;
use crate::lineage::PathState;
use crate::manifest::ManifestRow;
use quilt_uri::ManifestUri;
use quilt_uri::S3Uri;
fn manifest_uri(hash: &str) -> ManifestUri {
ManifestUri {
bucket: "b".to_string(),
namespace: ("foo", "bar").into(),
hash: hash.to_string(),
origin: None,
}
}
fn first_push_uri() -> ManifestUri {
manifest_uri("")
}
async fn seed_remote_latest(remote: &MockRemote, latest_hash: &str) -> Res {
remote
.put_object(
&None,
&S3Uri::try_from("s3://b/.quilt/named_packages/foo/bar/latest")?,
latest_hash.as_bytes().to_vec(),
)
.await?;
Ok(())
}
#[test(tokio::test)]
async fn test_publish_commits_message_only_and_pushes() -> Res {
let storage = MockStorage::default();
let remote = MockRemote::default();
let lineage = PackageLineage {
remote_uri: Some(first_push_uri()),
..PackageLineage::default()
};
let mut manifest = Manifest::default();
let outcome = publish_package(
lineage,
&mut manifest,
&DomainPaths::default(),
&storage,
&remote,
PathBuf::default(),
InstalledPackageStatus::default(),
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "Custom message".to_string(),
user_meta: None,
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::CommittedAndPushed(p) => p,
PublishOutcome::PushedOnly(_) => {
panic!("expected CommittedAndPushed even without file changes");
}
};
assert!(push.certified_latest);
assert!(push.lineage.commit.is_none());
Ok(())
}
#[test(tokio::test)]
async fn test_publish_skips_commit_when_no_changes() -> Res {
let hash = fixtures::top_hash::EMPTY_NULL_TOP_HASH.to_string();
let lineage = PackageLineage {
commit: Some(CommitState {
timestamp: chrono::Utc::now(),
hash: hash.clone(),
prev_hashes: Vec::new(),
}),
remote_uri: Some(first_push_uri()),
..PackageLineage::default()
};
let storage = MockStorage::default();
storage
.write_byte_stream(
PathBuf::from(format!(".quilt/packages/b/{hash}")),
ByteStream::from_static(b"foo"),
)
.await?;
let remote = MockRemote::default();
seed_remote_latest(&remote, &hash).await?;
let mut manifest = Manifest::default();
manifest.header.user_meta = Some(serde_json::Value::Null);
let outcome = publish_package(
lineage,
&mut manifest,
&DomainPaths::default(),
&storage,
&remote,
PathBuf::default(),
InstalledPackageStatus::default(),
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: String::new(),
user_meta: None,
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::PushedOnly(p) => p,
PublishOutcome::CommittedAndPushed(_) => {
panic!("should skip commit when no changes");
}
};
assert!(push.certified_latest);
assert_eq!(push.lineage.remote()?.hash, hash);
Ok(())
}
#[test(tokio::test)]
async fn test_publish_push_fails_after_successful_commit() -> Res {
let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let base_record = manifest_src.get_record(&PathBuf::from("0mb.bin")).unwrap();
let added = ManifestRow {
logical_key: PathBuf::from("foo"),
hash: base_record.hash.clone(),
size: base_record.size,
physical_key: base_record.physical_key.clone(),
..ManifestRow::default()
};
let storage = MockStorage::default();
storage
.write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
.await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
..InstalledPackageStatus::default()
};
let lineage = PackageLineage {
paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
..PackageLineage::default()
};
let remote = MockRemote::default();
let mut manifest = Manifest::default();
let err = publish_package(
lineage,
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
&remote,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "published".to_string(),
user_meta: None,
workflow: None,
},
)
.await
.unwrap_err();
assert!(
err.to_string().contains("remote"),
"expected remote-missing error, got: {err}"
);
Ok(())
}
async fn setup_storages_for_commit_and_push(hash_hex: &str) -> Res<(MockStorage, MockRemote)> {
let storage = MockStorage::default();
storage
.write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
.await?;
let remote = MockRemote::default();
let object_path = PathBuf::from(format!("/.quilt/objects/{hash_hex}"));
remote
.storage
.write_byte_stream(object_path, ByteStream::default())
.await?;
Ok((storage, remote))
}
fn first_push_lineage_with_foo() -> PackageLineage {
PackageLineage {
paths: BTreeMap::from([(PathBuf::from("foo"), PathState::default())]),
remote_uri: Some(first_push_uri()),
..PackageLineage::default()
}
}
fn row_from_fixture(fixture: &Manifest, source_key: &str) -> ManifestRow {
let base_record = fixture.get_record(&PathBuf::from(source_key)).unwrap();
ManifestRow {
logical_key: PathBuf::from("foo"),
hash: base_record.hash.clone(),
size: base_record.size,
physical_key: base_record.physical_key.clone(),
..ManifestRow::default()
}
}
fn assert_first_push_of_foo_bar(push: &PushResult) -> Res {
assert!(push.certified_latest);
let pushed = push.lineage.remote()?;
assert!(
!pushed.hash.is_empty(),
"pushed manifest should have a hash"
);
assert_eq!(pushed.namespace, ("foo", "bar").into());
assert!(
push.lineage.commit.is_none(),
"publish should clear the pending commit after a successful push"
);
assert_eq!(
push.lineage.base_hash, pushed.hash,
"first push should pin base_hash to the uploaded revision"
);
assert_eq!(
push.lineage.latest_hash, pushed.hash,
"first push should pin latest_hash to the uploaded revision"
);
Ok(())
}
#[test(tokio::test)]
async fn test_publish_commits_and_pushes_happy_path() -> Res {
let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let added = row_from_fixture(&manifest_src, "0mb.bin");
let (storage, remote) =
setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
..InstalledPackageStatus::default()
};
let mut manifest = Manifest::default();
let outcome = publish_package(
first_push_lineage_with_foo(),
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
&remote,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "published".to_string(),
user_meta: None,
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::CommittedAndPushed(p) => p,
PublishOutcome::PushedOnly(_) => {
panic!("expected CommittedAndPushed, got PushedOnly");
}
};
assert_first_push_of_foo_bar(push)
}
#[test(tokio::test)]
async fn test_publish_modifies_file_and_pushes() -> Res {
let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let modified = row_from_fixture(&manifest_src, "less-then-8mb.txt");
let storage = MockStorage::default();
storage
.write_byte_stream(
PathBuf::from("/working-dir/foo"),
ByteStream::from_static(fixtures::objects::less_than_8mb()),
)
.await?;
let remote = MockRemote::default();
let object_path = PathBuf::from(format!(
"/.quilt/objects/{}",
fixtures::objects::LESS_THAN_8MB_HASH_HEX
));
remote
.storage
.write_byte_stream(
object_path,
ByteStream::from_static(fixtures::objects::less_than_8mb()),
)
.await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Modified(modified))]),
..InstalledPackageStatus::default()
};
let mut manifest = Manifest::default();
manifest
.insert_record(row_from_fixture(&manifest_src, "0mb.bin"))
.await?;
let outcome = publish_package(
first_push_lineage_with_foo(),
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
&remote,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "modified".to_string(),
user_meta: None,
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::CommittedAndPushed(p) => p,
PublishOutcome::PushedOnly(_) => {
panic!("expected CommittedAndPushed, got PushedOnly");
}
};
assert_first_push_of_foo_bar(push)
}
#[test(tokio::test)]
async fn test_publish_removes_file_and_pushes() -> Res {
let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let existing = row_from_fixture(&manifest_src, "0mb.bin");
let (storage, remote) =
setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Removed(existing.clone()))]),
..InstalledPackageStatus::default()
};
let mut manifest = Manifest::default();
manifest.insert_record(existing).await?;
let outcome = publish_package(
first_push_lineage_with_foo(),
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
&remote,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "removed".to_string(),
user_meta: None,
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::CommittedAndPushed(p) => p,
PublishOutcome::PushedOnly(_) => {
panic!("expected CommittedAndPushed, got PushedOnly");
}
};
assert_first_push_of_foo_bar(push)?;
assert!(
!push.lineage.paths.contains_key(&PathBuf::from("foo")),
"lineage.paths should no longer track the removed file"
);
Ok(())
}
#[test(tokio::test)]
async fn test_publish_with_meta_and_pushes() -> Res {
let manifest_src = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let added = row_from_fixture(&manifest_src, "0mb.bin");
let (storage, remote) =
setup_storages_for_commit_and_push(fixtures::objects::ZERO_HASH_HEX).await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added))]),
..InstalledPackageStatus::default()
};
let mut manifest = Manifest::default();
let outcome = publish_package(
first_push_lineage_with_foo(),
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
&remote,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
HostConfig::default(),
CommitOptions {
message: "Lorem ipsum".to_string(),
user_meta: Some(serde_json::json!({"lorem": "ipsum"})),
workflow: None,
},
)
.await?;
let push = match &outcome {
PublishOutcome::CommittedAndPushed(p) => p,
PublishOutcome::PushedOnly(_) => {
panic!("expected CommittedAndPushed, got PushedOnly");
}
};
assert_first_push_of_foo_bar(push)
}
}