use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use async_stream::try_stream;
use chrono::{DateTime, TimeDelta, Utc};
use futures::{Stream, StreamExt as _, TryStreamExt as _, stream};
use icechunk_types::{ICResultExt as _, error::ICResultCtxExt as _};
use thiserror::Error;
use tokio::io::AsyncReadExt as _;
use tracing::{debug, error, info, warn};
const SYNTHETIC_EVENT_OFFSET: TimeDelta = TimeDelta::milliseconds(1);
use crate::{
Repository,
error::ICError,
format::{
CONFIG_FILE_PATH, IcechunkFormatErrorKind, REPO_INFO_FILE_PATH, SnapshotId,
V1_REFS_FILE_PATH,
format_constants::SpecVersionBin,
repo_info::{RepoAvailability, RepoInfo, RepoStatus, UpdateInfo, UpdateType},
snapshot::SnapshotInfo,
},
refs::{Ref, RefData, RefErrorKind, RefResult, list_deleted_tags, list_refs},
repository::{RepositoryErrorKind, VersionInfo},
storage::StorageErrorKind,
};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MigrationErrorKind {
#[error(transparent)]
RepositoryError(#[from] RepositoryErrorKind),
#[error(transparent)]
RefError(#[from] RefErrorKind),
#[error(transparent)]
FormatError(#[from] IcechunkFormatErrorKind),
#[error(transparent)]
StorageError(#[from] StorageErrorKind),
#[error(
"invalid repository version ({actual}), this function can only migrate repositories that use version {expected} of the specification, to repositories with spec version {target}"
)]
InvalidRepositoryMigration {
expected: SpecVersionBin,
target: SpecVersionBin,
actual: SpecVersionBin,
},
#[error(
"the storage instance used to initialize this repo is read-only, the migration process needs to make changes in object store"
)]
ReadonlyRepo,
#[error(
"An unknown error prevented this repo from migrating, please contact the Icechunk maintainers by writing an issue report: https://github.com/earth-mover/icechunk/issues"
)]
Unknown,
}
pub type MigrationError = ICError<MigrationErrorKind>;
pub type MigrationResult<A> = Result<A, MigrationError>;
async fn validate_start(repo: &Repository) -> MigrationResult<()> {
if repo.spec_version() != SpecVersionBin::V1 {
error!("Target repository must be a 1.X Icechunk repository");
return Err(MigrationErrorKind::InvalidRepositoryMigration {
expected: SpecVersionBin::V1,
target: SpecVersionBin::V2,
actual: repo.spec_version(),
})
.capture();
}
if !repo.storage().can_write().await.inject()? {
error!("Storage instance must be writable");
return Err(MigrationError::capture(MigrationErrorKind::ReadonlyRepo));
}
Ok(())
}
async fn fetch_deleted_tag_snapshot_id(
repo: &Repository,
tag_name: &str,
) -> Option<SnapshotId> {
let ref_path = format!("{V1_REFS_FILE_PATH}/tag.{tag_name}/ref.json");
match repo.storage().get_object(repo.storage_settings(), &ref_path, None).await {
Ok((mut reader, ..)) => {
let mut data = Vec::with_capacity(40);
if reader.read_to_end(&mut data).await.is_err() {
return None;
}
let ref_data: RefData = serde_json::from_slice(&data).ok()?;
Some(ref_data.snapshot)
}
Err(_) => None,
}
}
fn generate_migration_ops_log(
main_ancestry: &[SnapshotInfo],
branches: &[(&str, Vec<SnapshotInfo>)],
tags: &[(&str, SnapshotId)],
deleted_tags: &[(&str, SnapshotId)],
all_snapshots: &[SnapshotInfo],
) -> Vec<(UpdateType, DateTime<Utc>)> {
if main_ancestry.is_empty() {
return Vec::new();
}
let snap_times: HashMap<&SnapshotId, DateTime<Utc>> =
all_snapshots.iter().map(|s| (&s.id, s.flushed_at)).collect();
let root = &main_ancestry[main_ancestry.len() - 1];
let root_time = root.flushed_at;
let mut emitted_snap_ids: HashSet<_> =
main_ancestry.iter().map(|s| s.id.clone()).collect();
let capacity =
1 + all_snapshots.len() + branches.len() + tags.len() + 2 * deleted_tags.len();
let mut entries: Vec<(UpdateType, DateTime<Utc>)> = Vec::with_capacity(capacity);
entries.push((UpdateType::RepoInitializedUpdate, root_time));
for snap in main_ancestry.iter().rev().skip(1) {
entries.push((
UpdateType::NewCommitUpdate {
branch: Ref::DEFAULT_BRANCH.to_string(),
new_snap_id: snap.id.clone(),
},
snap.flushed_at,
));
}
for (name, ancestry) in branches {
let branch_tip_time = ancestry.first().map(|s| s.flushed_at).unwrap_or(root_time);
entries.push((
UpdateType::BranchCreatedUpdate { name: name.to_string() },
branch_tip_time + SYNTHETIC_EVENT_OFFSET,
));
for snap in ancestry.iter().rev() {
if emitted_snap_ids.insert(snap.id.clone()) {
entries.push((
UpdateType::NewCommitUpdate {
branch: name.to_string(),
new_snap_id: snap.id.clone(),
},
snap.flushed_at,
));
}
}
}
for (name, snap_id) in tags {
let time = snap_times.get(snap_id).copied().unwrap_or(root_time);
entries.push((
UpdateType::TagCreatedUpdate { name: name.to_string() },
time + SYNTHETIC_EVENT_OFFSET,
));
}
for (name, snap_id) in deleted_tags {
let time = snap_times.get(snap_id).copied().unwrap_or(root_time);
entries.push((
UpdateType::TagCreatedUpdate { name: name.to_string() },
time + SYNTHETIC_EVENT_OFFSET,
));
entries.push((
UpdateType::TagDeletedUpdate {
name: name.to_string(),
previous_snap_id: snap_id.clone(),
},
time + SYNTHETIC_EVENT_OFFSET + SYNTHETIC_EVENT_OFFSET,
));
}
entries.sort_by_key(|(_, time)| *time);
for i in 1..entries.len() {
let prev_time = entries[i - 1].1;
if entries[i].1 <= prev_time {
entries[i].1 = prev_time + TimeDelta::microseconds(1);
}
}
entries.reverse();
entries
}
async fn do_migrate(
repo: &Repository,
repo_info: Arc<RepoInfo>,
start_time: Instant,
delete_unused_v1_files: bool,
) -> MigrationResult<()> {
info!("Writing new repository info file");
let new_asset_manager =
repo.asset_manager().clone_for_spec_version(SpecVersionBin::V2);
let new_version_info =
new_asset_manager.create_repo_info(repo_info).await.inject()?;
info!(version=?new_version_info, "Written repository info file");
info!("Opening migrated repo");
let Ok(migrated) = Repository::open(
Some(repo.config().clone()),
Arc::clone(repo.storage()),
Default::default(),
)
.await
else {
error!("Unknown error during migration. Repository doesn't open");
delete_repo_info(repo).await?;
error!("Migration failed");
return Err(MigrationError::capture(MigrationErrorKind::Unknown));
};
let new_spec_version = migrated.spec_version();
if new_spec_version != SpecVersionBin::V2 {
error!("Unknown error during migration. Repository doesn't open as 2.0");
delete_repo_info(repo).await?;
error!("Migration failed");
return Err(MigrationError::capture(MigrationErrorKind::Unknown));
}
if delete_unused_v1_files {
if let Err(err) = delete_v1_refs(repo).await {
delete_repo_info(repo).await?;
error!("Migration failed");
return Err(err);
}
info!("Opening migrated repo");
let Ok(migrated) = Repository::open(
Some(repo.config().clone()),
Arc::clone(repo.storage()),
Default::default(),
)
.await
else {
error!("Unknown error during migration. Repository doesn't open");
delete_repo_info(repo).await?;
error!("Migration failed");
return Err(MigrationError::capture(MigrationErrorKind::Unknown));
};
let new_spec_version = migrated.spec_version();
if new_spec_version != SpecVersionBin::V2 {
error!("Unknown error during migration. Repository doesn't open as 2.0");
delete_repo_info(repo).await?;
error!("Migration failed");
return Err(MigrationError::capture(MigrationErrorKind::Unknown));
}
if let Err(err) = delete_config_yaml(repo).await {
warn!("Failed to delete V1 config.yaml: {err}, continuing anyway");
}
}
info!(
"Migration completed in {} seconds, you can use the repository now",
start_time.elapsed().as_secs()
);
Ok(())
}
pub async fn migrate_1_to_2(
repo: Repository,
dry_run: bool,
delete_unused_v1_files: bool,
prefetch_concurrency: Option<usize>,
) -> MigrationResult<()> {
let prefetch_concurrency = prefetch_concurrency.unwrap_or(64);
let start_time = Instant::now();
validate_start(&repo).await?;
info!("Starting migration");
info!("Collecting refs");
let refs = all_roots(&repo).await.inject()?.try_collect::<Vec<_>>().await.inject()?;
let tags = Vec::from_iter(refs.iter().filter_map(|(r, id)| {
if r.is_tag() { Some((r.name(), id.clone())) } else { None }
}));
let branches = Vec::from_iter(refs.iter().filter_map(|(r, id)| {
if r.is_branch() { Some((r.name(), id.clone())) } else { None }
}));
let deleted_tags =
list_deleted_tags(repo.storage().as_ref(), repo.storage_settings())
.await
.inject()?;
info!(
"Found {} refs: {} tags, {} branches, {} deleted tags",
refs.len(),
tags.len(),
branches.len(),
deleted_tags.len()
);
let deleted_tag_names: Vec<&str> = deleted_tags
.iter()
.filter_map(|s| {
s.as_str()
.strip_prefix("tag.")
.and_then(|s| s.strip_suffix("/ref.json.deleted"))
})
.collect();
info!("Collecting non-dangling snapshots, this may take a few minutes");
let asset_manager = Arc::clone(repo.asset_manager());
let prefetch_handle = {
let am = Arc::clone(&asset_manager);
tokio::spawn(async move {
let concurrency = prefetch_concurrency;
match am.list_snapshots().await {
Ok(snapshot_list) => {
let mut snap_infos: Vec<_> = match snapshot_list
.try_collect::<Vec<_>>()
.await
{
Ok(v) => v,
Err(e) => {
warn!(
"Snapshot prefetch: failed to collect snapshot list: {e}"
);
return;
}
};
snap_infos.sort_by(|a, b| b.created_at.cmp(&a.created_at));
info!(
"Snapshot prefetch: warming cache for {} snapshots with concurrency {}",
snap_infos.len(),
concurrency,
);
let fetches = stream::iter(snap_infos.into_iter().map(|info| {
let am = Arc::clone(&am);
async move {
if let Err(e) = am.fetch_snapshot(&info.id).await {
debug!(
"Snapshot prefetch: failed to fetch {}: {e}",
info.id
);
}
}
}))
.buffer_unordered(concurrency)
.count()
.await;
info!("Snapshot prefetch: completed {fetches} fetches");
}
Err(e) => {
warn!("Snapshot prefetch: failed to list snapshots: {e}");
}
}
})
};
let snap_ids = refs.iter().map(|(_, id)| id);
let all_snapshots =
pointed_snapshots(&repo, snap_ids).await?.try_collect::<Vec<_>>().await?;
prefetch_handle.abort();
info!("Found {} non-dangling snapshots", all_snapshots.len());
info!("Generating migration ops log");
let snap_by_id: HashMap<&SnapshotId, &SnapshotInfo> =
all_snapshots.iter().map(|s| (&s.id, s)).collect();
let ancestry_from = |tip: &SnapshotId| -> Vec<SnapshotInfo> {
let mut result = Vec::new();
let mut current = snap_by_id.get(tip);
while let Some(snap) = current {
result.push((*snap).clone());
current = snap.parent_id.as_ref().and_then(|pid| snap_by_id.get(pid));
}
result
};
let main_snap_id =
branches.iter().find(|(name, _)| *name == Ref::DEFAULT_BRANCH).map(|(_, id)| id);
let main_ancestry = main_snap_id.map(&ancestry_from).unwrap_or_default();
let branch_ancestries: Vec<(&str, Vec<SnapshotInfo>)> = branches
.iter()
.filter(|(name, _)| *name != Ref::DEFAULT_BRANCH)
.map(|(name, snap_id)| (*name, ancestry_from(snap_id)))
.collect();
let mut deleted_tags_with_snap: Vec<(&str, SnapshotId)> = Vec::new();
for name in &deleted_tag_names {
if let Some(snap_id) = fetch_deleted_tag_snapshot_id(&repo, name).await {
deleted_tags_with_snap.push((name, snap_id));
} else {
warn!(
"Could not fetch snapshot ID for deleted tag '{name}', skipping ops log entry"
);
}
}
let ops_log = generate_migration_ops_log(
&main_ancestry,
&branch_ancestries,
&tags,
&deleted_tags_with_snap,
&all_snapshots,
);
info!("Generated {} ops log entries", ops_log.len());
let previous_updates: Vec<crate::format::IcechunkResult<_>> =
ops_log.into_iter().map(|(ut, dt)| Ok((ut, dt, None))).collect();
let migration_page_size = (previous_updates.len() as u16 + 1)
.max(repo.config().num_updates_per_repo_info_file());
info!("Creating repository info file");
let persisted_config = repo.asset_manager().fetch_config().await.inject()?;
let config_bytes: Option<Vec<u8>> = match persisted_config {
Some((config, _)) => Some(
flexbuffers::to_vec(&config)
.map_err(|e| {
IcechunkFormatErrorKind::SerializationErrorFlexBuffers(Box::new(e))
})
.capture()?,
),
None => None,
};
let root = &main_ancestry[main_ancestry.len() - 1];
let root_time = root.flushed_at;
let repo_info = Arc::new(
RepoInfo::new(
SpecVersionBin::V2,
tags,
branches,
deleted_tag_names.iter().copied(),
all_snapshots,
&Default::default(),
UpdateInfo {
update_type: UpdateType::RepoMigratedUpdate {
from_version: SpecVersionBin::V1,
to_version: SpecVersionBin::V2,
},
update_time: Utc::now(),
previous_updates,
},
None,
migration_page_size,
None,
config_bytes.as_deref(),
None::<std::iter::Empty<u16>>,
None::<std::iter::Empty<u16>>,
&RepoStatus {
availability: RepoAvailability::Online,
set_at: root_time,
limited_availability_reason: None,
},
)
.inject()?,
);
if dry_run {
info!(
"Migration dry-run completed in {} seconds, your repository wasn't modified, run with `dry_run=False` to actually migrate",
start_time.elapsed().as_secs()
);
Ok(())
} else {
do_migrate(&repo, repo_info, start_time, delete_unused_v1_files).await
}
}
async fn delete_repo_info(repo: &Repository) -> MigrationResult<()> {
warn!("Deleting generated repo info file");
repo.storage()
.delete_objects(
repo.storage_settings(),
"",
stream::iter([(REPO_INFO_FILE_PATH.to_string(), 0)]).boxed(),
)
.await
.inject()?;
Ok(())
}
const V1_DEFAULT_BRANCH_KEY: &str = "branch.main/ref.json";
async fn delete_v1_refs(repo: &Repository) -> MigrationResult<()> {
info!("Deleting V1 references");
repo.storage()
.delete_objects(
repo.storage_settings(),
V1_REFS_FILE_PATH,
stream::iter([(V1_DEFAULT_BRANCH_KEY.to_string(), 0)]).boxed(),
)
.await
.inject()?;
info!("V1 main branch reference deleted");
let all = repo
.storage()
.list_objects(repo.storage_settings(), V1_REFS_FILE_PATH)
.await
.inject()?;
let delete_keys =
all.map_ok(|li| (li.id, 0)).boxed().try_collect::<Vec<_>>().await.inject()?;
if !delete_keys.is_empty() {
repo.storage()
.delete_objects(
repo.storage_settings(),
V1_REFS_FILE_PATH,
stream::iter(delete_keys).boxed(),
)
.await
.inject()?;
}
info!("All V1 references deleted, verifying");
let remaining = repo
.storage()
.list_objects(repo.storage_settings(), V1_REFS_FILE_PATH)
.await
.inject()?
.try_collect::<Vec<_>>()
.await
.inject()?;
if remaining.is_empty() {
info!("All V1 references have been deleted");
Ok(())
} else {
error!(
"Found {} remaining V1 references that couldn't be deleted",
remaining.len()
);
Err(MigrationError::capture(MigrationErrorKind::Unknown))
}
}
async fn delete_config_yaml(repo: &Repository) -> MigrationResult<()> {
info!("Deleting V1 config.yaml");
repo.storage()
.delete_objects(
repo.storage_settings(),
"",
stream::iter([(CONFIG_FILE_PATH.to_string(), 0)]).boxed(),
)
.await
.inject()?;
info!("V1 config.yaml deleted");
Ok(())
}
async fn all_roots<'a>(
repo: &'a Repository,
) -> RefResult<impl Stream<Item = RefResult<(Ref, SnapshotId)>> + 'a> {
let all_refs = list_refs(repo.storage().as_ref(), repo.storage_settings()).await?;
let roots = stream::iter(all_refs).then(move |r| async move {
r.fetch(repo.storage().as_ref(), repo.storage_settings())
.await
.map(|ref_data| (r, ref_data.snapshot))
});
Ok(roots)
}
async fn pointed_snapshots<'a>(
repo: &'a Repository,
leaves: impl Iterator<Item = &SnapshotId> + 'a,
) -> MigrationResult<impl Stream<Item = MigrationResult<SnapshotInfo>> + 'a> {
let mut seen: HashSet<SnapshotId> = HashSet::new();
let res = try_stream! {
for pointed_snap_id in leaves {
if ! seen.contains(pointed_snap_id) {
let parents = repo.ancestry(&VersionInfo::SnapshotId(pointed_snap_id.clone())).await.inject()?;
for await parent in parents {
let parent = parent.inject()?;
if seen.insert(parent.id.clone()) {
debug!("Found snapshot {}", parent.id);
yield parent
} else {
break
}
}
}
}
};
Ok(res)
}
#[cfg(all(test, feature = "object-store-fs"))]
#[expect(clippy::expect_used)]
mod tests {
use std::{collections::HashMap, path::Path};
use icechunk_macros::tokio_test;
use tempfile::{TempDir, tempdir};
use futures::TryStreamExt as _;
use crate::{RepositoryConfig, new_local_filesystem_storage, refs};
use super::*;
async fn prepare_v1_repo() -> Result<(Repository, TempDir), Box<dyn std::error::Error>>
{
let dir = tempdir().expect("cannot create temp dir");
let source_path = Path::new("../icechunk-python/tests/data/test-repo-v1");
fs_extra::copy_items(&[source_path], &dir, &Default::default())?;
let storage =
new_local_filesystem_storage(dir.path().join("test-repo-v1").as_path())
.await?;
let repo =
Repository::open(None, Arc::clone(&storage), Default::default()).await?;
Ok((repo, dir))
}
#[tokio_test]
async fn test_1_to_2_migration() -> Result<(), Box<dyn std::error::Error>> {
use crate::config::CachingConfig;
let (repo, _tmp) = prepare_v1_repo().await?;
let storage = Arc::clone(repo.storage());
let repo = Repository::open(
Some(RepositoryConfig {
caching: Some(CachingConfig {
num_chunk_refs: Some(10_000),
..Default::default()
}),
..Default::default()
}),
Arc::clone(&storage),
Default::default(),
)
.await?;
repo.save_config().await?;
let repo = Repository::open(
Some(RepositoryConfig {
caching: Some(CachingConfig {
num_chunk_refs: Some(99),
..Default::default()
}),
..Default::default()
}),
Arc::clone(&storage),
Default::default(),
)
.await?;
let mut tag_ancestries_before = HashMap::new();
for tag in repo.list_tags().await? {
let anc = repo
.ancestry(&VersionInfo::TagRef(tag.clone()))
.await?
.try_collect::<Vec<_>>()
.await?;
tag_ancestries_before.insert(tag, anc);
}
let mut branch_ancestries_before = HashMap::new();
for branch in repo.list_branches().await? {
let anc = repo
.ancestry(&VersionInfo::BranchTipRef(branch.clone()))
.await?
.try_collect::<Vec<_>>()
.await?;
branch_ancestries_before.insert(branch, anc);
}
migrate_1_to_2(repo, false, true, None).await.unwrap();
let repo = Repository::open(None, storage, Default::default()).await?;
let mut tag_ancestries_after = HashMap::new();
for tag in repo.list_tags().await? {
let anc = repo
.ancestry(&VersionInfo::TagRef(tag.clone()))
.await?
.try_collect::<Vec<_>>()
.await?;
tag_ancestries_after.insert(tag, anc);
}
let mut branch_ancestries_after = HashMap::new();
for branch in repo.list_branches().await? {
let anc = repo
.ancestry(&VersionInfo::BranchTipRef(branch.clone()))
.await?
.try_collect::<Vec<_>>()
.await?;
branch_ancestries_after.insert(branch, anc);
}
assert_eq!(tag_ancestries_before, tag_ancestries_after);
assert_eq!(branch_ancestries_before, branch_ancestries_after);
let (info, _) = repo.asset_manager().fetch_repo_info().await?;
let deleted_tags: Vec<_> = info.deleted_tags()?.collect();
assert_eq!(deleted_tags, vec!["deleted"]);
let (ops_log_stream, _, _) = repo.ops_log().await?;
let ops_log: Vec<_> = ops_log_stream.try_collect().await?;
let (_, ref first_update, _) = ops_log[0];
assert_eq!(
*first_update,
UpdateType::RepoMigratedUpdate {
from_version: SpecVersionBin::V1,
to_version: SpecVersionBin::V2,
}
);
assert!(
ops_log.iter().any(|(_, ut, _)| *ut == UpdateType::RepoInitializedUpdate),
"ops log should contain RepoInitializedUpdate"
);
let commit_count = ops_log
.iter()
.filter(|(_, ut, _)| matches!(ut, UpdateType::NewCommitUpdate { .. }))
.count();
assert_eq!(
commit_count, 5,
"should have 5 NewCommitUpdate entries (6 snapshots - 1 root)"
);
assert!(
ops_log.iter().any(|(_, ut, _)| *ut
== UpdateType::BranchCreatedUpdate { name: "my-branch".to_string() }),
"ops log should contain BranchCreatedUpdate for my-branch"
);
for tag_name in ["it works!", "it also works!", "deleted"] {
assert!(
ops_log.iter().any(|(_, ut, _)| *ut
== UpdateType::TagCreatedUpdate { name: tag_name.to_string() }),
"ops log should contain TagCreatedUpdate for '{tag_name}'"
);
}
let deleted_tag_created_idx = ops_log
.iter()
.position(|(_, ut, _)| {
*ut == UpdateType::TagCreatedUpdate { name: "deleted".to_string() }
})
.expect("TagCreatedUpdate for 'deleted' not found");
let deleted_tag_deleted_idx = ops_log
.iter()
.position(|(_, ut, _)| {
matches!(ut, UpdateType::TagDeletedUpdate { name, .. } if name == "deleted")
})
.expect("TagDeletedUpdate for 'deleted' not found");
let deleted_tag_snap_id = ops_log
.iter()
.find_map(|(_, ut, _)| {
if let UpdateType::TagDeletedUpdate { name, previous_snap_id } = ut
&& name == "deleted"
{
return Some(previous_snap_id.clone());
}
None
})
.expect("TagDeletedUpdate for 'deleted' not found");
let commit_idx = ops_log
.iter()
.position(|(_, ut, _)| {
matches!(ut, UpdateType::NewCommitUpdate { new_snap_id, .. } if *new_snap_id == deleted_tag_snap_id)
})
.expect("NewCommitUpdate for the deleted tag's snapshot not found");
assert!(
deleted_tag_deleted_idx < deleted_tag_created_idx,
"TagDeletedUpdate (idx {deleted_tag_deleted_idx}) should be newer than TagCreatedUpdate (idx {deleted_tag_created_idx})"
);
assert!(
deleted_tag_created_idx < commit_idx,
"TagCreatedUpdate (idx {deleted_tag_created_idx}) should be newer than NewCommitUpdate (idx {commit_idx})"
);
for window in ops_log.windows(2) {
let (time_a, _, _) = &window[0];
let (time_b, _, _) = &window[1];
assert!(
time_a > time_b,
"ops log timestamps must be strictly decreasing: {time_a} should be > {time_b}"
);
}
assert_eq!(
repo.config().caching().num_chunk_refs(),
10_000,
"persisted caching config should survive migration unchanged"
);
Ok(())
}
#[tokio_test]
async fn test_1_to_2_migration_already_v2() -> Result<(), Box<dyn std::error::Error>>
{
let (repo, _tmp) = prepare_v1_repo().await?;
let storage = Arc::clone(repo.storage());
migrate_1_to_2(repo, false, true, None).await.unwrap();
let repo = Repository::open(None, storage, Default::default()).await?;
let result = migrate_1_to_2(repo, false, true, None).await;
assert!(result.is_err(), "migrating an already-V2 repo should return an error");
Ok(())
}
#[tokio_test]
async fn test_1_to_2_migration_dry_run() -> Result<(), Box<dyn std::error::Error>> {
let (repo, _tmp) = prepare_v1_repo().await?;
let storage = Arc::clone(repo.storage());
migrate_1_to_2(repo, true, true, None).await.unwrap();
let repo = Repository::open(None, storage, Default::default()).await?;
assert_eq!(repo.spec_version(), SpecVersionBin::V1);
Ok(())
}
#[tokio_test]
async fn test_1_to_2_migration_without_delete()
-> Result<(), Box<dyn std::error::Error>> {
let (repo, _tmp) = prepare_v1_repo().await?;
let storage = Arc::clone(repo.storage());
migrate_1_to_2(repo, false, false, None).await.unwrap();
let repo = Repository::open(None, storage, Default::default()).await?;
assert_eq!(repo.spec_version(), SpecVersionBin::V2);
assert_eq!(
refs::list_branches(repo.storage().as_ref(), repo.storage_settings()).await?,
["main".to_string(), "my-branch".to_string()].into()
);
Ok(())
}
#[tokio_test]
async fn test_ops_log_chain_after_migration() -> Result<(), Box<dyn std::error::Error>>
{
let (repo, _tmp) = prepare_v1_repo().await?;
let storage = Arc::clone(repo.storage());
migrate_1_to_2(repo, false, true, None).await.unwrap();
let config = RepositoryConfig {
num_updates_per_repo_info_file: Some(3),
..Default::default()
};
let repo =
Repository::open(Some(config), Arc::clone(&storage), Default::default())
.await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
let mut expected_len = ops.len();
assert!(
matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate),
"ops log chain broken after migration: last entry is not RepoInitializedUpdate"
);
let snap_id = repo.lookup_branch("main").await?;
for i in 0..5 {
repo.create_tag(&format!("post-migration-tag-{i}"), &snap_id).await?;
expected_len += 1;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(
ops.len(),
expected_len,
"ops log length mismatch on iteration {i}"
);
assert!(
matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate),
"ops log chain broken on iteration {i}: last entry is not RepoInitializedUpdate"
);
for window in ops.windows(2) {
let (time_a, _, _) = &window[0];
let (time_b, _, _) = &window[1];
assert!(
time_a > time_b,
"ops log timestamps must be strictly decreasing: {time_a} should be > {time_b}"
);
}
}
Ok(())
}
}