use futures::stream::{StreamExt, TryStreamExt};
use time::OffsetDateTime;
use tracing::warn;
use crate::git::RefName;
use crate::keys;
use crate::object_store::{ObjectStore, ObjectStoreError};
use crate::protocol::fetch::MAX_FETCH_CONCURRENCY;
use super::PackchainError;
use super::schema::ChainManifest;
#[derive(Debug, Clone)]
pub(crate) struct ChainRef {
pub(crate) sha: String,
pub(crate) ref_path: String,
pub(crate) last_modified: OffsetDateTime,
}
pub(crate) async fn list_refs(
store: &dyn ObjectStore,
prefix: Option<&str>,
) -> Result<Vec<ChainRef>, PackchainError> {
let refs_prefix = keys::join(prefix, "refs/");
let metas = store.list(&refs_prefix).await?;
let candidates: Vec<(String, String, OffsetDateTime)> = metas
.into_iter()
.filter_map(|m| {
if !super::keys::is_chain_json_key(&m.key) {
return None;
}
let Some(ref_path) = super::keys::ref_path_from_chain_key(prefix, &m.key) else {
warn!(key = %m.key, "packchain list: chain.json key has unexpected shape; skipping");
return None;
};
if !RefName::is_valid(&ref_path) {
warn!(
key = %m.key,
ref_path = %ref_path,
"packchain list: derived ref path is not a valid ref name; skipping",
);
return None;
}
Some((m.key, ref_path, m.last_modified))
})
.collect();
let bodies: Vec<(String, String, OffsetDateTime, bytes::Bytes)> =
futures::stream::iter(candidates)
.map(|(key, ref_path, last_modified)| async move {
match store.get_bytes(&key).await {
Ok(body) => Ok(Some((key, ref_path, last_modified, body))),
Err(ObjectStoreError::NotFound(_)) => {
warn!(
key = %key,
"packchain list: chain.json vanished between list and get \
(concurrent delete?); skipping",
);
Ok(None)
}
Err(e) => Err(PackchainError::Store(e)),
}
})
.buffer_unordered(MAX_FETCH_CONCURRENCY)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect();
let mut out: Vec<ChainRef> = bodies
.into_iter()
.filter_map(|(key, ref_path, last_modified, body)| {
match ChainManifest::from_json_bytes(&body) {
Ok(chain) => Some(ChainRef {
sha: chain.tip.into(),
ref_path,
last_modified,
}),
Err(e) => {
warn!(
key = %key,
error = %e,
"packchain list: chain.json failed to parse; skipping ref",
);
None
}
}
})
.collect();
out.sort_by_key(|e| std::cmp::Reverse(e.last_modified));
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::git::RefName;
use crate::object_store::mock::MockStore;
use crate::packchain::manifest::write_chain;
use crate::packchain::schema::{ChainSegment, Sha40};
use bytes::Bytes;
const SHA_TIP: &str = "0000000000000000000000000000000000000001";
const SHA_FULL: &str = "0000000000000000000000000000000000000002";
const SHA_PACK: &str = "1111111111111111111111111111111111111111";
const SHA_TIP_DEV: &str = "0000000000000000000000000000000000000003";
fn sha40(s: &str) -> Sha40 {
Sha40::try_new(s).unwrap()
}
fn ref_(name: &str) -> RefName {
RefName::new(name).unwrap()
}
async fn write_test_chain(
store: &MockStore,
prefix: Option<&str>,
ref_name: &RefName,
tip: &str,
full_at: &str,
) {
let chain = ChainManifest {
v: 1,
tip: sha40(tip),
full_at: sha40(full_at),
segments: vec![ChainSegment {
sha: sha40(tip),
parent_sha: None,
pack: format!("packs/{SHA_PACK}.pack"),
bytes: 1_024,
}],
};
write_chain(store, prefix, ref_name, &chain).await.unwrap();
}
#[tokio::test]
async fn list_refs_returns_chain_tip_not_full_at_after_incremental_push() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_FULL,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].sha, SHA_TIP, "must report chain.tip");
assert_ne!(entries[0].sha, SHA_FULL, "must NOT report full_at");
assert_eq!(entries[0].ref_path, "refs/heads/main");
}
#[tokio::test]
async fn list_refs_empty_bucket_returns_empty_vec() {
let store = MockStore::new();
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert!(entries.is_empty());
}
#[tokio::test]
async fn list_refs_collects_multiple_branches() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_FULL,
)
.await;
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/dev"),
SHA_TIP_DEV,
SHA_TIP_DEV,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
let by_ref: std::collections::HashMap<_, _> = entries
.iter()
.map(|e| (e.ref_path.clone(), e.sha.clone()))
.collect();
assert_eq!(
by_ref.get("refs/heads/main").map(String::as_str),
Some(SHA_TIP)
);
assert_eq!(
by_ref.get("refs/heads/dev").map(String::as_str),
Some(SHA_TIP_DEV),
);
}
#[tokio::test]
async fn list_refs_handles_nested_branch_names() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/feature/foo"),
SHA_TIP,
SHA_TIP,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].ref_path, "refs/heads/feature/foo");
assert_eq!(entries[0].sha, SHA_TIP);
}
#[tokio::test]
async fn list_refs_works_at_bucket_root_with_no_prefix() {
let store = MockStore::new();
write_test_chain(&store, None, &ref_("refs/heads/main"), SHA_TIP, SHA_TIP).await;
let entries = list_refs(&store, None).await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].ref_path, "refs/heads/main");
assert_eq!(entries[0].sha, SHA_TIP);
}
#[tokio::test]
async fn list_refs_skips_corrupt_chain_json_with_warning() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_TIP,
)
.await;
store.insert(
"repo/refs/heads/broken/chain.json",
Bytes::from_static(b"{not valid json"),
);
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1, "corrupt branch must be skipped");
assert_eq!(entries[0].ref_path, "refs/heads/main");
}
#[tokio::test]
async fn list_refs_skips_unsupported_schema_version() {
let store = MockStore::new();
store.insert(
"repo/refs/heads/future/chain.json",
Bytes::from_static(
br#"{"v":2,"tip":"0000000000000000000000000000000000000001","full_at":"0000000000000000000000000000000000000001","segments":[]}"#,
),
);
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert!(
entries.is_empty(),
"unsupported schema must skip rather than abort",
);
}
#[tokio::test]
async fn list_refs_orders_newest_chain_first() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/dev"),
SHA_TIP_DEV,
SHA_TIP_DEV,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_TIP,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(
entries[0].ref_path, "refs/heads/main",
"newest chain.json (last_modified desc) must come first, \
even though `dev` is alphabetically earlier",
);
assert_eq!(entries[1].ref_path, "refs/heads/dev");
}
#[test]
fn ref_path_from_chain_key_strips_prefix_and_suffix() {
assert_eq!(
super::super::keys::ref_path_from_chain_key(
Some("repo"),
"repo/refs/heads/main/chain.json"
),
Some("refs/heads/main".to_owned()),
);
}
#[test]
fn ref_path_from_chain_key_handles_no_prefix() {
assert_eq!(
super::super::keys::ref_path_from_chain_key(None, "refs/heads/main/chain.json"),
Some("refs/heads/main".to_owned()),
);
assert_eq!(
super::super::keys::ref_path_from_chain_key(Some(""), "refs/heads/main/chain.json"),
Some("refs/heads/main".to_owned()),
);
}
#[test]
fn ref_path_from_chain_key_returns_none_for_unrelated_key() {
assert_eq!(
super::super::keys::ref_path_from_chain_key(
Some("repo"),
"repo-other/refs/heads/main/chain.json"
),
None,
);
}
#[tokio::test]
async fn list_refs_skips_chain_json_with_path_traversal_in_ref_name() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_TIP,
)
.await;
store.insert(
"repo/refs/heads/../etc/passwd/chain.json",
Bytes::from(
format!(r#"{{"v":1,"tip":"{SHA_TIP}","full_at":"{SHA_TIP}","segments":[]}}"#)
.into_bytes(),
),
);
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(
entries.len(),
1,
"malicious ref path must be filtered before emission",
);
assert_eq!(entries[0].ref_path, "refs/heads/main");
assert!(
!entries.iter().any(|e| e.ref_path.contains("..")),
"no entry with `..` in ref_path may reach the list output",
);
}
#[tokio::test]
async fn list_refs_surfaces_tag_chain() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/tags/v1"),
SHA_TIP,
SHA_TIP,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].ref_path, "refs/tags/v1");
assert_eq!(entries[0].sha, SHA_TIP);
}
#[tokio::test]
async fn list_refs_surfaces_notes_chain() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/notes/commits"),
SHA_TIP,
SHA_TIP,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].ref_path, "refs/notes/commits");
assert_eq!(entries[0].sha, SHA_TIP);
}
#[tokio::test]
async fn list_refs_collects_chains_from_mixed_namespaces() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_FULL,
)
.await;
write_test_chain(
&store,
Some("repo"),
&ref_("refs/tags/v1"),
SHA_TIP_DEV,
SHA_TIP_DEV,
)
.await;
write_test_chain(
&store,
Some("repo"),
&ref_("refs/notes/commits"),
SHA_FULL,
SHA_FULL,
)
.await;
let entries = list_refs(&store, Some("repo")).await.unwrap();
let by_ref: std::collections::HashMap<_, _> = entries
.iter()
.map(|e| (e.ref_path.clone(), e.sha.clone()))
.collect();
assert_eq!(by_ref.len(), 3, "all three namespaces must appear");
assert_eq!(
by_ref.get("refs/heads/main").map(String::as_str),
Some(SHA_TIP),
);
assert_eq!(
by_ref.get("refs/tags/v1").map(String::as_str),
Some(SHA_TIP_DEV),
);
assert_eq!(
by_ref.get("refs/notes/commits").map(String::as_str),
Some(SHA_FULL),
);
}
#[tokio::test]
async fn list_refs_ignores_non_chain_siblings_under_tag_namespace() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/tags/v1"),
SHA_TIP,
SHA_TIP,
)
.await;
store.insert(
"repo/refs/tags/v1/path-index.json",
Bytes::from(format!(r#"{{"v":1,"commit":"{SHA_TIP}","tree":{{}}}}"#).into_bytes()),
);
store.insert(
format!("repo/refs/tags/v1/{SHA_TIP}.bundle"),
Bytes::from_static(b"baseline"),
);
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1, "exactly one chain.json processed");
assert_eq!(entries[0].ref_path, "refs/tags/v1");
assert_eq!(entries[0].sha, SHA_TIP);
}
type PostListHook = Box<dyn FnOnce(&MockStore) + Send>;
struct PostListDeleteStore {
inner: MockStore,
hook: std::sync::Mutex<Option<PostListHook>>,
}
impl PostListDeleteStore {
fn new(inner: MockStore, hook: impl FnOnce(&MockStore) + Send + 'static) -> Self {
Self {
inner,
hook: std::sync::Mutex::new(Some(Box::new(hook))),
}
}
}
crate::delegate_to_inner_impl! {
impl ObjectStore for PostListDeleteStore {
forward: get_to_file, get_bytes, get_bytes_range,
put_bytes, put_path, put_if_absent,
head, copy, delete;
async fn list(
&self,
prefix: &str,
) -> Result<Vec<crate::object_store::ObjectMeta>, ObjectStoreError> {
let result = self.inner.list(prefix).await;
if result.is_ok()
&& let Some(hook) = self.hook.lock().unwrap().take()
{
hook(&self.inner);
}
result
}
}
}
#[tokio::test]
async fn list_refs_skips_chain_json_vanished_between_list_and_get() {
let inner = MockStore::new();
write_test_chain(
&inner,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_TIP,
)
.await;
write_test_chain(
&inner,
Some("repo"),
&ref_("refs/heads/dev"),
SHA_TIP_DEV,
SHA_TIP_DEV,
)
.await;
let dev_key = "repo/refs/heads/dev/chain.json";
let store = PostListDeleteStore::new(inner, move |inner| {
assert!(
inner.remove_key(dev_key),
"concurrent delete must remove the targeted chain.json",
);
});
let entries = list_refs(&store, Some("repo"))
.await
.expect("NotFound on one ref must not abort the listing");
assert_eq!(
entries.len(),
1,
"surviving ref must still surface; vanished ref is silently skipped",
);
assert_eq!(
entries[0].ref_path, "refs/heads/main",
"main must be the surviving entry — dev's chain.json was deleted mid-list",
);
assert_eq!(entries[0].sha, SHA_TIP);
}
#[tokio::test]
async fn list_refs_ignores_path_index_and_baseline_bundle_siblings() {
let store = MockStore::new();
write_test_chain(
&store,
Some("repo"),
&ref_("refs/heads/main"),
SHA_TIP,
SHA_TIP,
)
.await;
store.insert(
"repo/refs/heads/main/path-index.json",
Bytes::from(format!(r#"{{"v":1,"commit":"{SHA_TIP}","tree":{{}}}}"#).into_bytes()),
);
store.insert(
format!("repo/refs/heads/main/{SHA_TIP}.bundle"),
Bytes::from_static(b"baseline"),
);
let entries = list_refs(&store, Some("repo")).await.unwrap();
assert_eq!(entries.len(), 1, "exactly one chain.json processed");
assert_eq!(entries[0].ref_path, "refs/heads/main");
assert_eq!(entries[0].sha, SHA_TIP);
}
}