use async_stream::try_stream;
use std::{
collections::{BTreeSet, HashMap, HashSet},
future::ready,
ops::RangeBounds as _,
sync::Arc,
};
use chrono::{DateTime, Utc};
use futures::{
Stream, StreamExt as _, TryStreamExt as _,
stream::{self, FuturesOrdered, FuturesUnordered},
};
use itertools::Itertools as _;
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{join, sync::AcquireError, task::JoinError, try_join};
use tracing::{Instrument as _, debug, error, instrument, trace};
use crate::{
Storage,
asset_manager::AssetManager,
change_set::{ChangeSet, transaction_log_from_change_set},
config::{
Credentials, DEFAULT_MAX_CONCURRENT_REQUESTS, ManifestPreloadCondition,
RepositoryConfig,
},
diff::{Diff, DiffBuilder},
error::ICError,
feature_flags::{
CREATE_TAG_FLAG, DELETE_TAG_FLAG, FEATURE_FLAGS, FeatureFlag, MOVE_NODE_FLAG,
find_feature_flag_id, raise_if_feature_flag_disabled,
},
format::{
IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, NodeId, Path,
SnapshotId,
format_constants::SpecVersionBin,
repo_info::{RepoAvailability, RepoInfo, RepoStatus, UpdateType},
snapshot::{
ManifestFileInfo, NodeData, NodeType, Snapshot, SnapshotInfo,
SnapshotProperties,
},
},
refs::{self, Ref, RefError, RefErrorKind},
session::{Session, SessionError, SessionErrorKind, SessionResult},
storage::{self, StorageErrorKind},
virtual_chunks::VirtualChunkResolver,
};
use icechunk_types::{ICResultExt as _, error::ICResultCtxExt as _};
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum VersionInfo {
SnapshotId(SnapshotId),
TagRef(String),
BranchTipRef(String),
AsOf { branch: String, at: DateTime<Utc> },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RefVersionInfo {
SnapshotId(SnapshotId),
TagRef(String),
BranchTipRef(String),
}
impl TryFrom<&VersionInfo> for RefVersionInfo {
type Error = (String, DateTime<Utc>);
fn try_from(value: &VersionInfo) -> Result<Self, Self::Error> {
match value {
VersionInfo::SnapshotId(id) => Ok(RefVersionInfo::SnapshotId(id.clone())),
VersionInfo::TagRef(name) => Ok(RefVersionInfo::TagRef(name.clone())),
VersionInfo::BranchTipRef(name) => {
Ok(RefVersionInfo::BranchTipRef(name.clone()))
}
VersionInfo::AsOf { branch, at } => Err((branch.clone(), *at)),
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum RepositoryErrorKind {
#[error(transparent)]
StorageError(#[from] StorageErrorKind),
#[error(transparent)]
FormatError(#[from] IcechunkFormatErrorKind),
#[error(transparent)]
Ref(#[from] RefErrorKind),
#[error("snapshot not found: `{id}`")]
SnapshotNotFound { id: SnapshotId },
#[error("branch {branch} does not have a snapshots before or at {at}")]
InvalidAsOfSpec { branch: String, at: DateTime<Utc> },
#[error("invalid snapshot id: `{0}`")]
InvalidSnapshotId(String),
#[error("tag error: `{0}`")]
Tag(String),
#[error("repositories can only be created in clean prefixes")]
ParentDirectoryNotClean,
#[error("the repository doesn't exist")]
RepositoryDoesntExist,
#[error("error in repository serialization")]
SerializationError(#[from] Box<rmp_serde::encode::Error>),
#[error("error in repository deserialization")]
DeserializationError(#[from] Box<rmp_serde::decode::Error>),
#[error(
"error finding conflicting path for node `{0}`, this probably indicades a bug in `rebase`"
)]
ConflictingPathNotFound(NodeId),
#[error("error in config deserialization")]
ConfigDeserializationError(#[from] serde_yaml_ng::Error),
#[error("config was updated by other session")]
ConfigWasUpdated,
#[error("branch update conflict: `({expected_parent:?}) != ({actual_parent:?})`")]
Conflict { expected_parent: Option<SnapshotId>, actual_parent: Option<SnapshotId> },
#[error("repo info object was updated after this session started")]
RepoInfoUpdated,
#[error("I/O error")]
IOError(#[from] std::io::Error),
#[error("a concurrent task failed")]
ConcurrencyError(#[from] JoinError),
#[error("the http request semaphore cannot be acquired")]
AcquireError(#[from] AcquireError),
#[error("main branch cannot be deleted")]
CannotDeleteMain,
#[error("the storage used by this Icechunk repository is read-only: {0}")]
ReadonlyStorage(String),
#[error("the repository status is read-only: {0}")]
ReadonlyRepository(String),
#[error(
"the first commit in the repository cannot be an amend, create a new commit instead"
)]
NoAmendForInitialCommit,
#[error(
"repository cannot be updated after {0} attempts, too many concurrent changes"
)]
RepoUpdateAttemptsLimit(u64),
#[error(
"repository version error, this operation requires a repository that is at least version {minimum_spec_version}, please upgrade your on-disk format before executing this operation"
)]
BadRepoVersion { minimum_spec_version: SpecVersionBin },
#[error("concurrency error, lock could not be acquired")]
PoisonLock,
#[error("unexpected error: {0}")]
Other(String),
}
pub type RepositoryError = ICError<RepositoryErrorKind>;
pub type RepositoryResult<T> = Result<T, RepositoryError>;
#[derive(Debug, Serialize, Deserialize)]
pub struct Repository {
spec_version: SpecVersionBin,
config: RepositoryConfig,
storage_settings: storage::Settings,
config_version: storage::VersionInfo,
storage: Arc<dyn Storage + Send + Sync>,
asset_manager: Arc<AssetManager>,
virtual_resolver: Arc<VirtualChunkResolver>,
authorized_virtual_containers: HashMap<String, Option<Credentials>>,
default_commit_metadata: SnapshotProperties,
}
impl Repository {
#[instrument(skip_all)]
pub async fn create(
config: Option<RepositoryConfig>,
storage: Arc<dyn Storage + Send + Sync>,
authorize_virtual_chunk_access: HashMap<String, Option<Credentials>>,
spec_version: Option<SpecVersionBin>,
check_clean_root: bool,
) -> RepositoryResult<Self> {
debug!("Creating Repository");
raise_if_cant_write(storage.as_ref(), "Cannot create repository").await?;
let has_overriden_config = match config {
Some(ref config) => config != &RepositoryConfig::default(),
None => false,
};
let storage_defaults = storage.default_settings().await.inject()?;
let config = config.unwrap_or_default();
let storage_settings = match config.storage.clone() {
Some(user_storage) => storage_defaults.merge(user_storage),
None => storage_defaults,
};
let config =
RepositoryConfig { storage: Some(storage_settings.clone()), ..config };
let spec_version = spec_version.unwrap_or_default();
let asset_manager = Arc::new(AssetManager::new_with_config(
Arc::clone(&storage),
storage_settings.clone(),
spec_version,
config.caching(),
config.compression().level(),
config.max_concurrent_requests(),
));
if check_clean_root && !storage.root_is_clean(&storage_settings).await.inject()? {
return Err(RepositoryError::capture(
RepositoryErrorKind::ParentDirectoryNotClean,
));
};
let asset_manager_c = Arc::clone(&asset_manager);
let storage_c = Arc::clone(&storage);
let settings_ref = &storage_settings;
let num_updates = config.num_updates_per_repo_info_file();
let config_ref = &config;
let create_repo_info = async move {
let new_snapshot = Arc::new(Snapshot::initial(spec_version).inject()?);
let write_snap = asset_manager_c.write_snapshot(Arc::clone(&new_snapshot));
if spec_version >= SpecVersionBin::V2 {
let empty_tx_log = transaction_log_from_change_set(
&Snapshot::INITIAL_SNAPSHOT_ID,
&ChangeSet::for_edits(),
);
let snap_info = new_snapshot.as_ref().try_into().inject()?;
let config_to_store =
if has_overriden_config { Some(config_ref) } else { None };
let repo_info = Arc::new(RepoInfo::initial(
spec_version,
snap_info,
num_updates,
config_to_store,
None,
));
let write_tx = asset_manager_c.write_transaction_log(
Snapshot::INITIAL_SNAPSHOT_ID,
Arc::new(empty_tx_log),
);
try_join!(write_snap, write_tx)?;
asset_manager_c.create_repo_info(Arc::clone(&repo_info)).await?;
} else {
write_snap.await?;
refs::update_branch(
storage_c.as_ref(),
settings_ref,
Ref::DEFAULT_BRANCH,
new_snapshot.id().clone(),
None,
)
.await
.inject()?;
}
Ok::<_, RepositoryError>(())
}
.in_current_span();
let config_version = if spec_version >= SpecVersionBin::V2 {
create_repo_info.await?;
storage::VersionInfo::for_creation()
} else {
let storage_c = Arc::clone(&storage);
let config_c = config.clone();
let update_config = async move {
if has_overriden_config {
let version = Repository::store_config(
storage_c,
&config_c,
&storage::VersionInfo::for_creation(),
)
.await?;
Ok::<_, RepositoryError>(version)
} else {
Ok(storage::VersionInfo::for_creation())
}
}
.in_current_span();
let (_, config_version) = try_join!(create_repo_info, update_config)?;
config_version
};
debug_assert!(Self::exists(Arc::clone(&storage), None).await.unwrap_or(false));
Self::new(
spec_version,
config,
config_version,
storage,
storage_settings,
asset_manager,
authorize_virtual_chunk_access,
)
}
#[instrument(skip_all)]
pub async fn open(
config: Option<RepositoryConfig>,
storage: Arc<dyn Storage + Send + Sync>,
authorize_virtual_chunk_access: HashMap<String, Option<Credentials>>,
) -> RepositoryResult<Self> {
debug!("Opening Repository");
let temp_settings = storage.default_settings().await.inject()?;
let temp_am = AssetManager::new_no_cache(
Arc::clone(&storage),
temp_settings,
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
);
let storage_c = Arc::clone(&storage);
let user_settings = config.as_ref().and_then(|c| c.storage().cloned());
let fetch_version =
tokio::spawn(Self::fetch_spec_version(storage_c, user_settings));
let fetch_config_yaml = temp_am.fetch_config();
let (spec_version_result, config_yaml_result) =
join!(fetch_version, fetch_config_yaml);
let spec_version = match spec_version_result.capture()?? {
Some(v) => Ok(v),
None => {
Err(RepositoryError::capture(RepositoryErrorKind::RepositoryDoesntExist))
}
}?;
trace!(%spec_version, "Repository version found");
let (persisted_config, config_version) = if spec_version >= SpecVersionBin::V2 {
let (repo_info, _) = temp_am.fetch_repo_info().await?;
(repo_info.config().inject()?, storage::VersionInfo::for_creation())
} else {
match config_yaml_result? {
Some((c, v)) => (Some(c), v),
None => (None, storage::VersionInfo::for_creation()),
}
};
let storage_defaults = storage.default_settings().await.inject()?;
let repo_config = match persisted_config {
Some(c) => RepositoryConfig::default().merge(c),
None => RepositoryConfig::default(),
};
let merged_config = config.map(|c| repo_config.merge(c)).unwrap_or(repo_config);
let storage_settings = match merged_config.storage.clone() {
Some(s) => storage_defaults.merge(s),
None => storage_defaults,
};
let final_config =
RepositoryConfig { storage: Some(storage_settings.clone()), ..merged_config };
let asset_manager = Arc::new(AssetManager::new_with_config(
Arc::clone(&storage),
storage_settings.clone(),
spec_version,
final_config.caching(),
final_config.compression().level(),
final_config.max_concurrent_requests(),
));
Self::new(
spec_version,
final_config,
config_version,
storage,
storage_settings,
asset_manager,
authorize_virtual_chunk_access,
)
}
pub async fn open_or_create(
config: Option<RepositoryConfig>,
storage: Arc<dyn Storage + Send + Sync>,
authorize_virtual_chunk_access: HashMap<String, Option<Credentials>>,
create_version: Option<SpecVersionBin>,
check_clean_root: bool,
) -> RepositoryResult<Self> {
let user_settings = config.as_ref().and_then(|c| c.storage().cloned());
if Self::fetch_spec_version(Arc::clone(&storage), user_settings).await?.is_some()
{
Self::open(config, storage, authorize_virtual_chunk_access).await
} else {
Self::create(
config,
storage,
authorize_virtual_chunk_access,
create_version,
check_clean_root,
)
.await
}
}
fn new(
spec_version: SpecVersionBin,
config: RepositoryConfig,
config_version: storage::VersionInfo,
storage: Arc<dyn Storage + Send + Sync>,
storage_settings: storage::Settings,
asset_manager: Arc<AssetManager>,
authorized_virtual_containers: HashMap<String, Option<Credentials>>,
) -> RepositoryResult<Self> {
let containers = config.virtual_chunk_containers().cloned();
validate_credentials(&config, &authorized_virtual_containers)?;
let virtual_resolver = Arc::new(VirtualChunkResolver::new(
containers,
authorized_virtual_containers.clone(),
storage_settings.clone(),
));
Ok(Self {
spec_version,
config,
config_version,
storage,
storage_settings,
virtual_resolver,
asset_manager,
authorized_virtual_containers,
default_commit_metadata: SnapshotProperties::default(),
})
}
#[instrument(skip_all)]
pub async fn exists(
storage: Arc<dyn Storage + Send + Sync>,
settings: Option<storage::Settings>,
) -> RepositoryResult<bool> {
Ok(Self::fetch_spec_version(storage, settings).await?.is_some())
}
#[instrument(skip_all)]
pub async fn fetch_spec_version(
storage: Arc<dyn Storage + Send + Sync>,
settings: Option<storage::Settings>,
) -> RepositoryResult<Option<SpecVersionBin>> {
let settings = match settings {
Some(s) => s,
None => storage.default_settings().await.inject()?,
};
let storage_c = Arc::clone(&storage);
let settings_c = settings.clone();
let is_v1 = async move {
match refs::fetch_branch_tip_v1(
storage_c.as_ref(),
&settings_c,
Ref::DEFAULT_BRANCH,
)
.await
{
Ok(_) => Ok(true),
Err(RefError { kind: RefErrorKind::RefNotFound(_), .. }) => Ok(false),
Err(err) => Err(err),
}
}
.in_current_span();
let after_v1 = async move {
let temp_asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
settings,
SpecVersionBin::current(),
1, DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let res = temp_asset_manager.fetch_repo_info().await;
Ok(res.and_then(|(ri, _)| ri.spec_version().inject()))
}
.in_current_span();
let (is_v1, after_v1) = try_join!(is_v1, after_v1).inject()?;
match after_v1 {
Ok(v) => Ok(Some(v)),
Err(RepositoryError {
kind: RepositoryErrorKind::RepositoryDoesntExist,
..
}) => {
if is_v1 {
Ok(Some(SpecVersionBin::V1))
} else {
Ok(None)
}
}
Err(err) => Err(err),
}
}
#[instrument(skip_all)]
pub async fn reopen(
&self,
config: Option<RepositoryConfig>,
authorize_virtual_chunk_access: Option<HashMap<String, Option<Credentials>>>,
) -> RepositoryResult<Self> {
let storage_defaults = self.storage.default_settings().await.inject()?;
let repo_config = self.config().clone();
let config = config.map(|c| repo_config.merge(c)).unwrap_or(repo_config);
let storage_settings = match config.storage.clone() {
Some(s) => storage_defaults.merge(s),
None => storage_defaults,
};
let config =
RepositoryConfig { storage: Some(storage_settings.clone()), ..config };
Self::new(
self.spec_version,
config,
self.config_version.clone(),
Arc::clone(&self.storage),
storage_settings,
Arc::clone(&self.asset_manager),
authorize_virtual_chunk_access
.unwrap_or_else(|| self.authorized_virtual_containers.clone()),
)
}
#[instrument(skip(bytes))]
pub fn from_bytes(bytes: &[u8]) -> RepositoryResult<Self> {
rmp_serde::from_slice(bytes).map_err(Box::new).capture()
}
#[instrument(skip(self))]
pub fn as_bytes(&self) -> RepositoryResult<Vec<u8>> {
rmp_serde::to_vec(self).map_err(Box::new).capture()
}
#[instrument(skip_all)]
pub async fn fetch_config(
storage: Arc<dyn Storage + Send + Sync>,
) -> RepositoryResult<Option<(RepositoryConfig, storage::VersionInfo)>> {
let settings = storage.default_settings().await.inject()?;
let spec_version = Self::fetch_spec_version(Arc::clone(&storage), None)
.await?
.unwrap_or_default();
let am = AssetManager::new_no_cache(
Arc::clone(&storage),
settings,
spec_version,
1, DEFAULT_MAX_CONCURRENT_REQUESTS,
);
if spec_version >= SpecVersionBin::V2 {
let (repo_info, version) = am.fetch_repo_info().await?;
Ok(repo_info.config().inject()?.map(|config| (config, version)))
} else {
am.fetch_config().await
}
}
#[instrument(skip_all)]
pub async fn save_config(&self) -> RepositoryResult<storage::VersionInfo> {
if self.spec_version >= SpecVersionBin::V2 {
let config = self.config().clone();
let num_updates = self.config.num_updates_per_repo_info_file();
let spec_version = self.spec_version();
let do_update = move |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
Ok(Arc::new(
repo_info
.set_config(spec_version, &config, backup_path, num_updates)
.inject()?,
))
};
let version = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(version)
} else {
Repository::store_config(
Arc::clone(self.storage()),
self.config(),
&self.config_version,
)
.await
}
}
#[instrument(skip_all)]
pub fn set_default_commit_metadata(&mut self, metadata: SnapshotProperties) {
self.default_commit_metadata = metadata;
}
#[instrument(skip_all)]
pub fn default_commit_metadata(&self) -> &SnapshotProperties {
&self.default_commit_metadata
}
#[instrument(skip_all)]
pub async fn get_metadata(&self) -> RepositoryResult<SnapshotProperties> {
self.asset_manager().fail_unless_spec_at_least(SpecVersionBin::V2)?;
let (repo, _) = self.asset_manager().fetch_repo_info().await?;
repo.metadata().inject()
}
#[instrument(skip(self, metadata))]
pub async fn update_metadata(
&self,
metadata: &SnapshotProperties,
) -> RepositoryResult<SnapshotProperties> {
self.raise_if_cant_write("Cannot set metadata").await?;
let mut final_metadata = Default::default();
let num_updates = self.config().num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
final_metadata = repo_info.metadata().inject()?;
final_metadata.extend(metadata.clone());
Ok(Arc::new(
repo_info
.set_metadata(
self.spec_version(),
&final_metadata,
backup_path,
num_updates,
)
.inject()?,
))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(final_metadata)
}
#[instrument(skip(self, metadata))]
pub async fn set_metadata(
&self,
metadata: &SnapshotProperties,
) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot set metadata").await?;
let num_updates = self.config().num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
Ok(Arc::new(
repo_info
.set_metadata(self.spec_version(), metadata, backup_path, num_updates)
.inject()?,
))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip_all)]
pub async fn get_status(&self) -> RepositoryResult<RepoStatus> {
self.asset_manager().fail_unless_spec_at_least(SpecVersionBin::V2)?;
let (repo, _) = self.asset_manager().fetch_repo_info().await?;
repo.status().inject()
}
#[expect(unsafe_code)]
#[instrument(skip(self))]
pub async fn set_status(&self, status: &RepoStatus) -> RepositoryResult<()> {
self.asset_manager().fail_unless_spec_at_least(SpecVersionBin::V2)?;
self.raise_if_cant_write("Cannot set status").await?;
let num_updates = self.config().num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
Ok(Arc::new(
repo_info
.set_status(self.spec_version(), status, backup_path, num_updates)
.inject()?,
))
};
unsafe {
let _ = self
.asset_manager
.update_repo_info_unchecked(
self.config.repo_update_retries().retries(),
do_update,
)
.await?;
}
Ok(())
}
#[instrument(skip(self))]
pub async fn feature_flags(
&self,
) -> RepositoryResult<impl Iterator<Item = FeatureFlag>> {
let (repo_info, _) = self.get_repo_info().await?;
let enabled_flags: HashSet<u16> =
if let Some(iter) = repo_info.enabled_feature_flags().inject()? {
iter.collect()
} else {
HashSet::new()
};
let disabled_flags: HashSet<u16> =
if let Some(iter) = repo_info.disabled_feature_flags().inject()? {
iter.collect()
} else {
HashSet::new()
};
let res = FEATURE_FLAGS.iter().map(move |(name, (id, default_enabled))| {
let setting = enabled_flags
.get(id)
.map(|_| true)
.or_else(|| disabled_flags.get(id).map(|_| false));
FeatureFlag::new(*id, name, *default_enabled, setting)
});
Ok(res)
}
#[instrument(skip(self))]
pub async fn enabled_feature_flags(
&self,
) -> RepositoryResult<impl Iterator<Item = FeatureFlag>> {
Ok(self.feature_flags().await?.filter(|ff| ff.enabled()))
}
#[instrument(skip(self))]
pub async fn disabled_feature_flags(
&self,
) -> RepositoryResult<impl Iterator<Item = FeatureFlag>> {
Ok(self.feature_flags().await?.filter(|ff| !ff.enabled()))
}
#[instrument(skip(self))]
pub async fn set_feature_flag(
&self,
feature_flag: &str,
set_to: Option<bool>,
) -> RepositoryResult<()> {
self.asset_manager().fail_unless_spec_at_least(SpecVersionBin::V2)?;
let num_updates = self.config.num_updates_per_repo_info_file();
let flag_id = find_feature_flag_id(feature_flag).inject()?;
let do_update = move |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
Ok(Arc::new(
repo_info
.update_feature_flag(
self.spec_version(),
flag_id,
set_to,
backup_path,
num_updates,
)
.inject()?,
))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(storage, config))]
pub(crate) async fn store_config(
storage: Arc<dyn Storage + Send + Sync>,
config: &RepositoryConfig,
previous_version: &storage::VersionInfo,
) -> RepositoryResult<storage::VersionInfo> {
raise_if_cant_write(storage.as_ref(), "Cannot save configuration").await?;
let settings = storage.default_settings().await.inject()?;
let am = AssetManager::new_no_cache(
storage,
settings,
SpecVersionBin::current(),
1, DEFAULT_MAX_CONCURRENT_REQUESTS,
);
let backup_path = if previous_version.is_create() {
None
} else {
Some(am.backup_path_for_config())
};
match am
.try_update_config(config, previous_version, backup_path.as_deref())
.await?
{
Some(new_version) => Ok(new_version),
None => Err(RepositoryError::capture(RepositoryErrorKind::ConfigWasUpdated)),
}
}
pub fn config(&self) -> &RepositoryConfig {
&self.config
}
pub fn storage_settings(&self) -> &storage::Settings {
&self.storage_settings
}
pub fn storage(&self) -> &Arc<dyn Storage + Send + Sync> {
&self.storage
}
pub fn asset_manager(&self) -> &Arc<AssetManager> {
&self.asset_manager
}
pub fn spec_version(&self) -> SpecVersionBin {
self.spec_version
}
pub fn authorized_virtual_container_prefixes(&self) -> HashSet<String> {
self.authorized_virtual_containers.keys().cloned().collect()
}
#[instrument(skip(self))]
async fn snapshot_info_ancestry_v1(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + use<>>
{
let res =
self.snapshot_ancestry_v1(snapshot_id).await?.and_then(|snap| async move {
let info = snap.as_ref().try_into().inject()?;
Ok(info)
});
Ok(res)
}
#[instrument(skip(self))]
async fn snapshot_ancestry_v1(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<Arc<Snapshot>>> + use<>>
{
let am = Arc::clone(&self.asset_manager);
#[expect(deprecated)]
am.snapshot_ancestry_v1(snapshot_id).await
}
#[instrument(skip(self))]
pub async fn ancestry(
&self,
version: &VersionInfo,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + Send + use<>>
{
self.ancestry_using(version, None).await
}
async fn ancestry_using(
&self,
version: &VersionInfo,
repo_info: Option<Arc<RepoInfo>>,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + Send + use<>>
{
match self.spec_version {
SpecVersionBin::V1 => Ok(self.ancestry_v1(version).await?.left_stream()),
SpecVersionBin::V2 => {
let ri = match repo_info {
Some(ri) => ri,
None => self.get_repo_info().await?.0,
};
let snapshot_id = self.resolve_version_v2(&ri, version).await?;
let iter = AncestryIteratorV2::new(ri, &snapshot_id)?;
Ok(stream::iter(iter).right_stream())
}
}
}
#[instrument(skip(self))]
async fn ancestry_v1(
&self,
version: &VersionInfo,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + Send + use<>>
{
let snapshot_id = self.resolve_version(version).await?;
self.snapshot_info_ancestry_v1(&snapshot_id).await
}
#[instrument(skip(self))]
async fn ancestry_ref_v1(
&self,
version: &RefVersionInfo,
) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + Send + use<>>
{
let snapshot_id = self.resolve_ref_version_v1(version).await?;
self.snapshot_info_ancestry_v1(&snapshot_id).await
}
#[instrument(skip(self))]
pub async fn ops_log(
&self,
) -> RepositoryResult<(
impl Stream<Item = RepositoryResult<(DateTime<Utc>, UpdateType, Option<String>)>>
+ Send
+ use<>,
Arc<RepoInfo>,
storage::VersionInfo,
)> {
let (repo_info, version_root) = self.get_repo_info().await?;
let repo_info_root = Arc::clone(&repo_info);
let mut repo_info = Some(repo_info);
let mut this_file_path = None;
let asset_manager = Arc::clone(self.asset_manager());
let stream = try_stream! {
while let Some(this) = repo_info {
for maybe_data in this.latest_updates().inject()? {
let (a, b, c) = maybe_data.inject()?;
yield (b, a, c.or(this_file_path.as_deref()).map(|c| c.to_string()));
}
if let Some(previous) = this.repo_before_updates().inject()? {
this_file_path = Some(previous.to_string());
match asset_manager.fetch_repo_info_backup(previous).await {
Ok((new_one, _)) => {
repo_info = Some(new_one);
}
Err(RepositoryError{kind:RepositoryErrorKind::RepositoryDoesntExist, ..}) => {
repo_info = None;
}
Err(err) => {
repo_info = None;
Err(err)?;
}
}
} else {
repo_info = None;
}
}
};
Ok((stream, repo_info_root, version_root))
}
#[instrument(skip(self))]
pub async fn create_branch(
&self,
branch_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot create branch").await?;
match self.spec_version() {
SpecVersionBin::V1 => self.create_branch_v1(branch_name, snapshot_id).await,
SpecVersionBin::V2 => self.create_branch_v2(branch_name, snapshot_id).await,
}
}
#[instrument(skip(self))]
async fn create_branch_v2(
&self,
branch_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
let num_updates = self.config.num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?;
Ok(Arc::new(
repo_info
.add_branch(
self.spec_version(),
branch_name,
snapshot_id,
backup_path,
num_updates,
)
.inject()?,
))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(self))]
async fn create_branch_v1(
&self,
branch_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
raise_if_invalid_snapshot_id_v1(&self.asset_manager, snapshot_id).await?;
refs::update_branch(
self.storage.as_ref(),
&self.storage_settings,
branch_name,
snapshot_id.clone(),
None,
)
.await
.map_err(|e| match e {
RefError {
kind: RefErrorKind::Conflict { expected_parent, actual_parent },
context,
} => ICError {
kind: RepositoryErrorKind::Conflict { expected_parent, actual_parent },
context,
},
err => err.inject(),
})?;
Ok(())
}
#[instrument(skip(self))]
async fn list_branches_v1(&self) -> RepositoryResult<BTreeSet<String>> {
let branches = refs::list_branches(self.storage.as_ref(), &self.storage_settings)
.await
.inject()?;
Ok(branches)
}
#[instrument(skip(self))]
async fn list_branches_v2(&self) -> RepositoryResult<BTreeSet<String>> {
let (ri, _) = self.get_repo_info().await?;
let it = ri.branch_names().inject()?;
Ok(it.map(|s| s.to_string()).collect())
}
#[instrument(skip(self))]
pub async fn list_branches(&self) -> RepositoryResult<BTreeSet<String>> {
match self.spec_version {
SpecVersionBin::V1 => self.list_branches_v1().await,
SpecVersionBin::V2 => self.list_branches_v2().await,
}
}
#[instrument(skip(self))]
async fn lookup_branch_v1(&self, branch: &str) -> RepositoryResult<SnapshotId> {
let branch_version = refs::fetch_branch_tip_v1(
self.storage.as_ref(),
&self.storage_settings,
branch,
)
.await
.inject()?;
Ok(branch_version.snapshot)
}
#[instrument(skip(self))]
async fn lookup_branch_v2(
&self,
branch: &str,
repo_info: Option<&RepoInfo>,
) -> RepositoryResult<SnapshotId> {
let fetched; let ri = match repo_info {
Some(ri) => ri,
None => {
fetched = self.get_repo_info().await?.0;
&fetched
}
};
match ri.resolve_branch(branch) {
Ok(snap) => Ok(snap),
Err(IcechunkFormatError {
kind: IcechunkFormatErrorKind::BranchNotFound { .. },
context,
}) => Err(ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::RefNotFound(
branch.to_string(),
)),
context,
}),
Err(err) => Err(err.inject()),
}
}
#[instrument(skip(self))]
pub async fn lookup_branch(&self, branch: &str) -> RepositoryResult<SnapshotId> {
match self.spec_version {
SpecVersionBin::V1 => self.lookup_branch_v1(branch).await,
SpecVersionBin::V2 => self.lookup_branch_v2(branch, None).await,
}
}
#[instrument(skip(self))]
async fn lookup_snapshot_v1(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<SnapshotInfo> {
self.asset_manager.fetch_snapshot_info(snapshot_id).await
}
#[instrument(skip(self))]
async fn lookup_snapshot_v2(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<SnapshotInfo> {
let (ri, _) = self.get_repo_info().await?;
ri.find_snapshot(snapshot_id).inject()
}
#[instrument(skip(self))]
pub async fn lookup_snapshot(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<SnapshotInfo> {
match self.spec_version {
SpecVersionBin::V1 => self.lookup_snapshot_v1(snapshot_id).await,
SpecVersionBin::V2 => self.lookup_snapshot_v2(snapshot_id).await,
}
}
#[instrument(skip(self))]
pub async fn lookup_manifest_files(
&self,
snapshot_id: &SnapshotId,
) -> RepositoryResult<impl Iterator<Item = ManifestFileInfo>> {
let snap = self.asset_manager.fetch_snapshot(snapshot_id).await?;
let files: Vec<_> = snap.manifest_files().try_collect().inject()?;
Ok(files.into_iter())
}
#[instrument(skip(self))]
pub async fn reset_branch(
&self,
branch: &str,
to_snapshot_id: &SnapshotId,
from_snapshot_id: Option<&SnapshotId>,
) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot reset branch").await?;
match self.spec_version {
SpecVersionBin::V1 => {
self.reset_branch_v1(branch, to_snapshot_id, from_snapshot_id).await
}
SpecVersionBin::V2 => {
self.reset_branch_v2(branch, to_snapshot_id, from_snapshot_id).await
}
}
}
#[instrument(skip(self))]
async fn reset_branch_v1(
&self,
branch: &str,
to_snapshot_id: &SnapshotId,
from_snapshot_id: Option<&SnapshotId>,
) -> RepositoryResult<()> {
raise_if_invalid_snapshot_id_v1(&self.asset_manager, to_snapshot_id).await?;
let branch_tip = match from_snapshot_id {
Some(snap) => snap,
None => &self.lookup_branch(branch).await?,
};
refs::update_branch(
self.storage.as_ref(),
&self.storage_settings,
branch,
to_snapshot_id.clone(),
Some(branch_tip),
)
.await
.inject()?;
Ok(())
}
#[instrument(skip(self))]
async fn reset_branch_v2(
&self,
branch: &str,
to_snapshot_id: &SnapshotId,
from_snapshot_id: Option<&SnapshotId>,
) -> RepositoryResult<()> {
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
if let Some(from_snapshot_id) = from_snapshot_id
&& &repo_info.resolve_branch(branch).inject()? != from_snapshot_id
{
return Err(RepositoryError::capture(RepositoryErrorKind::Conflict {
expected_parent: Some(from_snapshot_id.clone()),
actual_parent: Some(from_snapshot_id.clone()),
}));
}
let num_updates = self.config.num_updates_per_repo_info_file();
let new_repo = repo_info
.update_branch(
self.spec_version(),
branch,
to_snapshot_id,
backup_path,
num_updates,
)
.map_err(|err| match err {
IcechunkFormatError {
kind: IcechunkFormatErrorKind::BranchNotFound { .. },
context,
} => ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::RefNotFound(
branch.to_string(),
)),
context,
},
err => err.inject(),
});
Ok(Arc::new(new_repo?))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn delete_branch(&self, branch: &str) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot delete branch").await?;
if branch == Ref::DEFAULT_BRANCH {
Err(RepositoryError::capture(RepositoryErrorKind::CannotDeleteMain))
} else {
match self.spec_version {
SpecVersionBin::V1 => self.delete_branch_v1(branch).await,
SpecVersionBin::V2 => self.delete_branch_v2(branch).await,
}
}
}
#[instrument(skip(self))]
async fn delete_branch_v1(&self, branch: &str) -> RepositoryResult<()> {
refs::delete_branch(self.storage.as_ref(), &self.storage_settings, branch)
.await
.inject()?;
Ok(())
}
#[instrument(skip(self))]
async fn delete_branch_v2(&self, branch: &str) -> RepositoryResult<()> {
let num_updates = self.config.num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
let new_repo = repo_info
.delete_branch(self.spec_version(), branch, backup_path, num_updates)
.map_err(|err| match err {
IcechunkFormatError {
kind: IcechunkFormatErrorKind::BranchNotFound { .. },
context,
} => ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::RefNotFound(
branch.to_string(),
)),
context,
},
err => err.inject(),
});
Ok(Arc::new(new_repo?))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn delete_tag(&self, tag: &str) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot delete tag").await?;
match self.spec_version {
SpecVersionBin::V1 => self.delete_tag_v1(tag).await,
SpecVersionBin::V2 => self.delete_tag_v2(tag).await,
}
}
#[instrument(skip(self))]
async fn delete_tag_v1(&self, tag: &str) -> RepositoryResult<()> {
refs::delete_tag(self.storage.as_ref(), &self.storage_settings, tag)
.await
.inject()
}
#[instrument(skip(self))]
async fn delete_tag_v2(&self, tag: &str) -> RepositoryResult<()> {
let num_updates = self.config.num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
raise_if_feature_flag_disabled(
repo_info.as_ref(),
DELETE_TAG_FLAG,
"tag delete",
)
.inject()?;
let new_repo = repo_info
.delete_tag(self.spec_version(), tag, backup_path, num_updates)
.map_err(|err| match err {
IcechunkFormatError {
kind: IcechunkFormatErrorKind::TagNotFound { .. },
context,
} => ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::RefNotFound(
tag.to_string(),
)),
context,
},
err => err.inject(),
});
Ok(Arc::new(new_repo?))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn create_tag(
&self,
tag_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
self.raise_if_cant_write("Cannot create tag").await?;
match self.spec_version {
SpecVersionBin::V1 => self.create_tag_v1(tag_name, snapshot_id).await,
SpecVersionBin::V2 => self.create_tag_v2(tag_name, snapshot_id).await,
}
}
#[instrument(skip(self))]
async fn create_tag_v1(
&self,
tag_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
raise_if_invalid_snapshot_id_v1(&self.asset_manager, snapshot_id).await?;
refs::create_tag(
self.storage.as_ref(),
&self.storage_settings,
tag_name,
snapshot_id.clone(),
)
.await
.inject()?;
Ok(())
}
#[instrument(skip(self))]
async fn create_tag_v2(
&self,
tag_name: &str,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
let num_updates = self.config.num_updates_per_repo_info_file();
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?;
raise_if_feature_flag_disabled(
repo_info.as_ref(),
CREATE_TAG_FLAG,
"tag creation",
)
.inject()?;
let new_repo = repo_info
.add_tag(
self.spec_version(),
tag_name,
snapshot_id,
backup_path,
num_updates,
)
.map_err(|err| match err {
IcechunkFormatError {
kind: IcechunkFormatErrorKind::TagAlreadyExists { .. },
context,
} => ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::TagAlreadyExists(
tag_name.to_string(),
)),
context,
},
err => err.inject(),
});
Ok(Arc::new(new_repo?))
};
let _ = self
.asset_manager
.update_repo_info(self.config.repo_update_retries().retries(), do_update)
.await?;
Ok(())
}
#[instrument(skip(self))]
async fn list_tags_v1(&self) -> RepositoryResult<BTreeSet<String>> {
let tags = refs::list_tags(self.storage.as_ref(), &self.storage_settings)
.await
.inject()?;
Ok(tags)
}
#[instrument(skip(self))]
async fn list_tags_v2(&self) -> RepositoryResult<BTreeSet<String>> {
let (ri, _) = self.get_repo_info().await?;
let tags = ri.tag_names().inject()?;
Ok(tags.map(|s| s.to_string()).collect())
}
#[instrument(skip(self))]
pub async fn list_tags(&self) -> RepositoryResult<BTreeSet<String>> {
match self.spec_version {
SpecVersionBin::V1 => self.list_tags_v1().await,
SpecVersionBin::V2 => self.list_tags_v2().await,
}
}
#[instrument(skip(self))]
async fn lookup_tag_v1(&self, tag: &str) -> RepositoryResult<SnapshotId> {
let ref_data =
refs::fetch_tag(self.storage.as_ref(), &self.storage_settings, tag)
.await
.inject()?;
Ok(ref_data.snapshot)
}
#[instrument(skip(self))]
async fn lookup_tag_v2(
&self,
tag: &str,
repo_info: Option<&RepoInfo>,
) -> RepositoryResult<SnapshotId> {
let fetched; let ri = match repo_info {
Some(ri) => ri,
None => {
fetched = self.get_repo_info().await?.0;
&fetched
}
};
match ri.resolve_tag(tag) {
Ok(snap) => Ok(snap),
Err(IcechunkFormatError {
kind: IcechunkFormatErrorKind::TagNotFound { .. },
context,
}) => Err(ICError {
kind: RepositoryErrorKind::Ref(RefErrorKind::RefNotFound(
tag.to_string(),
)),
context,
}),
Err(err) => Err(err.inject()),
}
}
#[instrument(skip(self))]
pub async fn lookup_tag(&self, tag: &str) -> RepositoryResult<SnapshotId> {
match self.spec_version {
SpecVersionBin::V1 => self.lookup_tag_v1(tag).await,
SpecVersionBin::V2 => self.lookup_tag_v2(tag, None).await,
}
}
#[instrument(skip(self))]
async fn resolve_ref_version_v1(
&self,
version: &RefVersionInfo,
) -> RepositoryResult<SnapshotId> {
match version {
RefVersionInfo::SnapshotId(sid) => {
raise_if_invalid_snapshot_id_v1(self.asset_manager().as_ref(), sid)
.await?;
Ok(sid.clone())
}
RefVersionInfo::TagRef(tag) => {
let ref_data =
refs::fetch_tag(self.storage.as_ref(), &self.storage_settings, tag)
.await
.inject()?;
Ok(ref_data.snapshot)
}
RefVersionInfo::BranchTipRef(branch) => {
let ref_data = refs::fetch_branch_tip_v1(
self.storage.as_ref(),
&self.storage_settings,
branch,
)
.await
.inject()?;
Ok(ref_data.snapshot)
}
}
}
#[instrument(skip(self, repo_info))]
async fn resolve_ref_version_v2(
&self,
repo_info: &RepoInfo,
version: &RefVersionInfo,
) -> RepositoryResult<SnapshotId> {
match version {
RefVersionInfo::SnapshotId(sid) => {
raise_if_invalid_snapshot_id_v2(repo_info, sid)?;
Ok(sid.clone())
}
RefVersionInfo::TagRef(tag) => self.lookup_tag_v2(tag, Some(repo_info)).await,
RefVersionInfo::BranchTipRef(branch) => {
self.lookup_branch_v2(branch, Some(repo_info)).await
}
}
}
#[instrument(skip(self))]
pub async fn resolve_version(
&self,
version: &VersionInfo,
) -> RepositoryResult<SnapshotId> {
self.resolve_version_using(version, None).await
}
async fn resolve_version_using(
&self,
version: &VersionInfo,
repo_info: Option<&RepoInfo>,
) -> RepositoryResult<SnapshotId> {
match self.spec_version {
SpecVersionBin::V1 => self.resolve_version_v1(version).await,
SpecVersionBin::V2 => {
let fetched;
let ri = match repo_info {
Some(ri) => ri,
None => {
fetched = self.get_repo_info().await?.0;
&fetched
}
};
self.resolve_version_v2(ri, version).await
}
}
}
async fn get_repo_info(
&self,
) -> RepositoryResult<(Arc<RepoInfo>, storage::VersionInfo)> {
self.asset_manager.fetch_repo_info().await
}
#[instrument(skip(self))]
async fn resolve_version_v1(
&self,
version: &VersionInfo,
) -> RepositoryResult<SnapshotId> {
match version.try_into() {
Ok(ref_version_info) => self.resolve_ref_version_v1(&ref_version_info).await,
Err((branch, at)) => {
let tip = RefVersionInfo::BranchTipRef(branch.clone());
let snap = self
.ancestry_ref_v1(&tip)
.await?
.try_skip_while(|parent| ready(Ok(parent.flushed_at > at)))
.take(1)
.try_collect::<Vec<_>>()
.await?;
match snap.into_iter().next() {
Some(snap) => Ok(snap.id),
None => Err(RepositoryError::capture(
RepositoryErrorKind::InvalidAsOfSpec {
branch: branch.clone(),
at,
},
)),
}
}
}
}
#[instrument(skip(self, repo_info))]
async fn resolve_version_v2(
&self,
repo_info: &RepoInfo,
version: &VersionInfo,
) -> RepositoryResult<SnapshotId> {
match version.try_into() {
Ok(ref_version_info) => {
self.resolve_ref_version_v2(repo_info, &ref_version_info).await
}
Err((branch, at)) => {
let snap_id = repo_info.resolve_branch(branch.as_str()).inject()?;
let ancestry = repo_info.ancestry(&snap_id).inject()?;
let snap: Vec<_> = ancestry
.skip_while(|parent| {
if let Ok(parent) = parent {
parent.flushed_at > at
} else {
false
}
})
.take(1)
.try_collect()
.inject()?;
match snap.into_iter().next() {
Some(snap) => Ok(snap.id),
None => Err(RepositoryError::capture(
RepositoryErrorKind::InvalidAsOfSpec {
branch: branch.clone(),
at,
},
)),
}
}
}
}
#[instrument(skip(self))]
pub async fn diff(
&self,
from: &VersionInfo,
to: &VersionInfo,
) -> SessionResult<Diff> {
let repo_info = match self.spec_version {
SpecVersionBin::V1 => None,
SpecVersionBin::V2 => Some(self.get_repo_info().await.inject()?.0),
};
let from =
self.resolve_version_using(from, repo_info.as_deref()).await.inject()?;
let to = self.resolve_version_using(to, repo_info.as_deref()).await.inject()?;
let all_snaps = self
.ancestry_using(&VersionInfo::SnapshotId(to), repo_info.clone())
.await
.inject()?
.try_take_while(|snap_info| ready(Ok(snap_info.id != from)))
.try_collect::<Vec<_>>()
.await
.inject()?;
if all_snaps.last().and_then(|info| info.parent_id.as_ref()) != Some(&from) {
return Err(SessionError::capture(SessionErrorKind::BadSnapshotChainForDiff));
}
let fut: FuturesOrdered<_> = all_snaps
.iter()
.filter_map(|snap_info| {
if self.spec_version == SpecVersionBin::V1 && snap_info.is_initial() {
None
} else {
Some(
self.asset_manager
.fetch_transaction_log(&snap_info.id)
.in_current_span(),
)
}
})
.collect();
let builder = fut
.try_fold(DiffBuilder::default(), |mut res, log| {
ready(match res.add_changes(log.as_ref()) {
Ok(_) => Ok(res),
Err(e) => Err(RepositoryError::capture(e.kind)),
})
})
.await
.inject()?;
if let Some(to_snap) = all_snaps.first().as_ref().map(|snap| snap.id.clone()) {
let from_session = self
.readonly_session_using(
&VersionInfo::SnapshotId(from),
repo_info.as_deref(),
)
.await
.inject()?;
let to_session = self
.readonly_session_using(
&VersionInfo::SnapshotId(to_snap),
repo_info.as_deref(),
)
.await
.inject()?;
builder.to_diff(&from_session, &to_session).await
} else {
Err(SessionError::capture(SessionErrorKind::BadSnapshotChainForDiff))
}
}
#[instrument(skip(self))]
pub async fn readonly_session(
&self,
version: &VersionInfo,
) -> RepositoryResult<Session> {
self.readonly_session_using(version, None).await
}
async fn readonly_session_using(
&self,
version: &VersionInfo,
repo_info: Option<&RepoInfo>,
) -> RepositoryResult<Session> {
let snapshot_id = self.resolve_version_using(version, repo_info).await?;
let session = Session::create_readonly_session(
self.config.clone(),
self.storage_settings.clone(),
Arc::clone(&self.storage),
Arc::clone(&self.asset_manager),
Arc::clone(&self.virtual_resolver),
snapshot_id.clone(),
);
self.preload_manifests(snapshot_id);
Ok(session)
}
async fn fail_unless_online_status(&self, error_msg: &str) -> RepositoryResult<()> {
if self.spec_version() >= SpecVersionBin::V2 {
let status = self.get_status().await?;
if status.availability != RepoAvailability::Online {
return Err(RepositoryError::capture(
RepositoryErrorKind::ReadonlyRepository(format!(
"{error_msg}; {0}",
status.error_msg()
)),
));
}
}
Ok(())
}
#[instrument(skip(self))]
pub async fn writable_session(&self, branch: &str) -> RepositoryResult<Session> {
self.raise_if_cant_write("Cannot create writable session").await?;
self.fail_unless_online_status("Cannot create writable session").await?;
let snapshot_id = self.lookup_branch(branch).await?;
let session = Session::create_writable_session(
self.config.clone(),
self.storage_settings.clone(),
Arc::clone(&self.storage),
Arc::clone(&self.asset_manager),
Arc::clone(&self.virtual_resolver),
Some(branch.to_string()),
snapshot_id.clone(),
self.default_commit_metadata.clone(),
);
self.preload_manifests(snapshot_id);
Ok(session)
}
#[instrument(skip(self))]
pub async fn rearrange_session(&self, branch: &str) -> RepositoryResult<Session> {
self.raise_if_cant_write("Cannot create rearrange session").await?;
self.asset_manager().fail_unless_spec_at_least(SpecVersionBin::V2)?;
self.fail_unless_online_status("Cannot create rearrange session").await?;
let (ri, _) = self.asset_manager().fetch_repo_info().await?;
let snapshot_id = self.lookup_branch_v2(branch, Some(&ri)).await?;
raise_if_feature_flag_disabled(
ri.as_ref(),
MOVE_NODE_FLAG,
"create rearrange session",
)
.inject()?;
let session = Session::create_rearrange_session(
self.config.clone(),
self.storage_settings.clone(),
Arc::clone(&self.storage),
Arc::clone(&self.asset_manager),
Arc::clone(&self.virtual_resolver),
branch.to_string(),
snapshot_id.clone(),
self.default_commit_metadata.clone(),
);
self.preload_manifests(snapshot_id);
Ok(session)
}
#[instrument(skip(self))]
fn preload_manifests(&self, snapshot_id: SnapshotId) {
debug!("Preloading manifests");
let asset_manager = Arc::clone(self.asset_manager());
let preload_config = self.config().manifest().preload().clone();
let max_arrays_to_scan = preload_config.max_arrays_to_scan() as usize;
if preload_config.max_total_refs() == 0
|| matches!(preload_config.preload_if(), ManifestPreloadCondition::False)
{
return;
}
tokio::spawn(async move {
let mut loaded_manifests: HashSet<ManifestId> = HashSet::new();
let mut loaded_refs: u32 = 0;
let futures = FuturesUnordered::new();
if let Ok(snap) = asset_manager.fetch_snapshot(&snapshot_id).await {
let snap_c = Arc::clone(&snap);
for node in snap
.iter_arc(&Path::root())
.filter_ok(|node| node.node_type() == NodeType::Array)
.take(max_arrays_to_scan)
{
match node {
Err(err) => {
error!(error=%err, "Error retrieving snapshot nodes");
}
Ok(node) => match node.node_data {
NodeData::Group => {}
NodeData::Array { manifests, .. } => {
for manifest in manifests {
if !loaded_manifests.contains(&manifest.object_id) {
let manifest_id = manifest.object_id;
if let Some(manifest_info) = snap_c
.manifest_info(&manifest_id)
.ok()
.flatten()
&& loaded_refs + manifest_info.num_chunk_refs
<= preload_config.max_total_refs()
&& preload_config
.preload_if()
.matches(&node.path, &manifest_info)
{
let size_bytes = manifest_info.size_bytes;
let asset_manager =
Arc::clone(&asset_manager);
let manifest_id_c = manifest_id.clone();
let path = node.path.clone();
futures.push(async move {
trace!(
"Preloading manifest {} for array {}",
&manifest_id_c, path
);
if let Err(err) = asset_manager
.fetch_manifest(&manifest_id_c, size_bytes)
.await
{
error!(
"Failure pre-loading manifest {}: {}",
&manifest_id_c, err
);
}
});
loaded_manifests.insert(manifest_id);
loaded_refs += manifest_info.num_chunk_refs;
}
}
}
}
},
}
}
futures.collect::<()>().await;
};
().in_current_span()
});
}
async fn raise_if_cant_write(&self, msg: impl Into<String>) -> RepositoryResult<()> {
raise_if_cant_write(self.storage().as_ref(), msg).await
}
}
async fn raise_if_cant_write(
storage: &dyn Storage,
msg: impl Into<String>,
) -> RepositoryResult<()> {
if storage.can_write().await.inject()? {
Ok(())
} else {
Err(RepositoryError::capture(RepositoryErrorKind::ReadonlyStorage(msg.into())))
}
}
struct AncestryIteratorV2 {
_repo_info: Arc<RepoInfo>,
it: Box<dyn Iterator<Item = RepositoryResult<SnapshotInfo>> + Send>,
}
impl Iterator for AncestryIteratorV2 {
type Item = RepositoryResult<SnapshotInfo>;
fn next(&mut self) -> Option<Self::Item> {
self.it.next()
}
}
impl AncestryIteratorV2 {
#[expect(unsafe_code)]
fn new(repo_info: Arc<RepoInfo>, snapshot_id: &SnapshotId) -> RepositoryResult<Self> {
let it = unsafe {
let repo_info_ref = &*Arc::as_ptr(&repo_info);
Box::new(repo_info_ref.ancestry(snapshot_id).inject()?.map(|e| e.inject()))
};
Ok(Self { _repo_info: repo_info, it })
}
}
impl ManifestPreloadCondition {
pub fn matches(&self, path: &Path, info: &ManifestFileInfo) -> bool {
match self {
ManifestPreloadCondition::Or(vec) => {
vec.iter().any(|c| c.matches(path, info))
}
ManifestPreloadCondition::And(vec) => {
vec.iter().all(|c| c.matches(path, info))
}
ManifestPreloadCondition::PathMatches { regex } => Regex::new(regex)
.map(|regex| regex.is_match(path.to_string().as_bytes()))
.unwrap_or(false),
ManifestPreloadCondition::NameMatches { regex } => Regex::new(regex)
.map(|regex| {
path.name()
.map(|name| regex.is_match(name.as_bytes()))
.unwrap_or(false)
})
.unwrap_or(false),
ManifestPreloadCondition::NumRefs { from, to } => {
(*from, *to).contains(&info.num_chunk_refs)
}
ManifestPreloadCondition::True => true,
ManifestPreloadCondition::False => false,
}
}
}
fn validate_credentials(
config: &RepositoryConfig,
creds: &HashMap<String, Option<Credentials>>,
) -> RepositoryResult<()> {
for (url_prefix, cred) in creds {
if let Some(cont) = config.get_virtual_chunk_container(url_prefix)
&& let Err(error) = cont.validate_credentials(cred.as_ref())
{
return Err(RepositoryError::capture(RepositoryErrorKind::StorageError(
StorageErrorKind::Other(error),
)));
}
}
Ok(())
}
async fn raise_if_invalid_snapshot_id_v1(
asset_manager: &AssetManager,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
asset_manager.fetch_snapshot(snapshot_id).await.inject()?;
Ok(())
}
fn raise_if_invalid_snapshot_id_v2(
repo_info: &RepoInfo,
snapshot_id: &SnapshotId,
) -> RepositoryResult<()> {
repo_info.find_snapshot(snapshot_id).inject()?;
Ok(())
}
#[cfg(test)]
mod tests {
use futures::TryStreamExt as _;
use std::{
collections::HashMap, error::Error, iter::zip, num::NonZeroU16, path::PathBuf,
sync::Arc,
};
use bytes::Bytes;
use icechunk_macros::tokio_test;
use itertools::enumerate;
use rstest::rstest;
use rstest_reuse::{self, *};
use storage::logging::LoggingStorage;
use tempfile::TempDir;
use crate::{
Repository, Storage,
config::{
CachingConfig, ManifestConfig, ManifestPreloadConfig, ManifestSplitCondition,
ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig,
ManifestVirtualChunkLocationCompressionConfig, RepositoryConfig,
},
conflicts::basic_solver::BasicConflictSolver,
format::{
ByteRange, ChunkIndices, MANIFESTS_FILE_PATH,
manifest::{
ChunkPayload, ManifestSplits, VirtualChunkLocation, VirtualChunkRef,
},
snapshot::{ArrayShape, DimensionName},
},
migrations::migrate_1_to_2,
ops::manifests::rewrite_manifests,
session::{CommitMethod, SessionError, get_chunk},
storage::new_in_memory_storage,
test_utils::spec_version_cases,
};
use super::*;
fn ravel_multi_index(index: &[u32], shape: &[u32]) -> u32 {
index
.iter()
.zip(shape.iter())
.rev()
.fold((0, 1), |(acc, stride), (index, size)| {
(acc + *index * stride, stride * *size)
})
.0
}
async fn assert_manifest_count(
asset_manager: &Arc<AssetManager>,
total_manifests: usize,
) {
let actual = asset_manager.list_manifests().await.unwrap().count().await;
assert_eq!(
total_manifests, actual,
"Mismatch in manifest count: expected {total_manifests}, but got {actual}",
);
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_repository_persistent_config(
spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo = Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(spec_version),
true,
)
.await?;
let expected_default = RepositoryConfig {
storage: Some(storage.default_settings().await?),
..Default::default()
};
assert_eq!(repo.config(), &expected_default);
assert!(Repository::fetch_config(Arc::clone(&storage)).await?.is_none());
let repo = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
assert_eq!(repo.config(), &expected_default);
let repo = Repository::open(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(42),
..Default::default()
}),
Arc::clone(&storage),
HashMap::new(),
)
.await?;
assert_eq!(repo.config().inline_chunk_threshold_bytes(), 42);
let version = repo.save_config().await?;
assert_ne!(version, storage::VersionInfo::for_creation());
assert_eq!(
Repository::fetch_config(Arc::clone(&storage))
.await?
.unwrap()
.0
.inline_chunk_threshold_bytes(),
42
);
let repo = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
assert_eq!(repo.config().inline_chunk_threshold_bytes(), 42);
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let config = RepositoryConfig {
inline_chunk_threshold_bytes: Some(20),
caching: Some(CachingConfig {
num_chunk_refs: Some(21),
..CachingConfig::default()
}),
..RepositoryConfig::default()
};
let repo = Repository::create(
Some(config),
Arc::clone(&storage),
HashMap::new(),
Some(spec_version),
true,
)
.await?;
assert_eq!(repo.config().inline_chunk_threshold_bytes(), 20);
assert_eq!(repo.config().caching().num_chunk_refs(), 21);
let config = RepositoryConfig {
caching: Some(CachingConfig {
num_chunk_refs: Some(100),
..CachingConfig::default()
}),
..RepositoryConfig::default()
};
let repo = repo.reopen(Some(config.clone()), None).await?;
assert_eq!(repo.config().inline_chunk_threshold_bytes(), 20);
assert_eq!(repo.config().caching().num_chunk_refs(), 100);
let version = repo.save_config().await?;
assert_ne!(version, storage::VersionInfo::for_creation());
assert_eq!(&Repository::fetch_config(storage).await?.unwrap().0, repo.config());
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_manage_refs(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo = Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(spec_version),
true,
)
.await?;
let initial_branches = repo.list_branches().await?;
assert_eq!(initial_branches, BTreeSet::from(["main".into()]));
let initial_tags = repo.list_tags().await?;
assert_eq!(initial_tags, BTreeSet::new());
assert!(repo.create_tag("foo", &SnapshotId::random()).await.is_err());
assert_eq!(repo.list_tags().await?, BTreeSet::new());
assert!(repo.create_branch("bar", &SnapshotId::random()).await.is_err());
assert_eq!(repo.list_branches().await?, BTreeSet::from(["main".into()]));
let initial_snapshot = repo.lookup_branch("main").await?;
repo.create_branch("branch1", &initial_snapshot).await?;
repo.create_branch("branch2", &initial_snapshot).await?;
let branches = repo.list_branches().await?;
assert_eq!(
branches,
BTreeSet::from([
"main".to_string(),
"branch1".to_string(),
"branch2".to_string()
])
);
assert!(matches!(
repo.delete_branch("main").await,
Err(RepositoryError { kind: RepositoryErrorKind::CannotDeleteMain, .. })
));
repo.delete_branch("branch1").await?;
let branches = repo.list_branches().await?;
assert_eq!(branches, BTreeSet::from(["main".to_string(), "branch2".to_string()]));
repo.create_tag("tag1", &initial_snapshot).await?;
let tags = repo.list_tags().await?;
assert_eq!(tags, BTreeSet::from(["tag1".to_string()]));
let tag_snapshot = repo.lookup_tag("tag1").await?;
assert_eq!(tag_snapshot, initial_snapshot);
Ok(())
}
#[test]
fn test_manifest_preload_default_condition() {
let condition =
RepositoryConfig::default().manifest().preload().preload_if().clone();
assert!(!condition.matches(
&"/array".try_into().unwrap(),
&ManifestFileInfo {
id: ManifestId::random(),
size_bytes: 1,
num_chunk_refs: 1
}
));
assert!(!condition.matches(
&"/nottime".try_into().unwrap(),
&ManifestFileInfo {
id: ManifestId::random(),
size_bytes: 1,
num_chunk_refs: 1
}
));
assert!(!condition.matches(
&"/time".try_into().unwrap(),
&ManifestFileInfo {
id: ManifestId::random(),
size_bytes: 1,
num_chunk_refs: 1_000_000
}
));
}
async fn reopen_repo_with_new_splitting_config(
repo: &Repository,
split_sizes: Option<Vec<(ManifestSplitCondition, Vec<ManifestSplitDim>)>>,
) -> Repository {
let split_config = ManifestSplittingConfig { split_sizes };
let man_config = ManifestConfig {
preload: Some(ManifestPreloadConfig {
max_total_refs: None,
preload_if: None,
max_arrays_to_scan: None,
}),
splitting: Some(split_config.clone()),
virtual_chunk_location_compression: None,
};
let config = RepositoryConfig {
manifest: Some(man_config),
..RepositoryConfig::default()
};
repo.reopen(Some(config), None).await.unwrap()
}
async fn create_repo_with_split_manifest_config(
path: &Path,
shape: &ArrayShape,
dimension_names: &Option<Vec<DimensionName>>,
split_config: &ManifestSplittingConfig,
storage: Option<Arc<dyn Storage + Send + Sync>>,
spec_version: SpecVersionBin,
) -> Result<Repository, Box<dyn Error>> {
let backend: Arc<dyn Storage + Send + Sync> =
storage.unwrap_or(new_in_memory_storage().await?);
let storage = Arc::clone(&backend);
let man_config = ManifestConfig {
preload: Some(ManifestPreloadConfig {
max_total_refs: None,
preload_if: None,
max_arrays_to_scan: None,
}),
splitting: Some(split_config.clone()),
virtual_chunk_location_compression: None,
};
let config = RepositoryConfig {
manifest: Some(man_config),
..RepositoryConfig::default()
};
let repository = Repository::create(
Some(config),
storage,
HashMap::new(),
Some(spec_version),
true,
)
.await?;
let mut session = repository.writable_session("main").await?;
let def = Bytes::from_static(br#"{"this":"array"}"#);
session.add_group(Path::root(), def.clone()).await?;
session
.add_array(path.clone(), shape.clone(), dimension_names.clone(), def.clone())
.await?;
session.commit("initialized").max_concurrent_nodes(8).execute().await?;
Ok(repository)
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_resize_rewrites_manifests(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(0),
..Default::default()
}),
Arc::clone(&storage),
HashMap::new(),
Some(spec_version),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?;
let array_path: Path = "/array".to_string().try_into().unwrap();
let shape = ArrayShape::new(vec![(4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into()]);
let array_def = Bytes::from_static(br#"{"this":"other array"}"#);
session
.add_array(
array_path.clone(),
shape.clone(),
dimension_names.clone(),
array_def.clone(),
)
.await?;
let bytes = Bytes::copy_from_slice(&42i8.to_be_bytes());
for idx in 0..4 {
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
session.commit("first commit").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repo.asset_manager(), 1).await;
let mut session = repo.writable_session("main").await?;
let shape2 = ArrayShape::new(vec![(2, 2)]).unwrap();
session
.update_array(
&array_path,
shape2.clone(),
dimension_names.clone(),
array_def.clone(),
)
.await?;
session.commit("second commit").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repo.asset_manager(), 2).await;
let mut session = repo.writable_session("main").await?;
let shape3 = ArrayShape::new(vec![(6, 6)]).unwrap();
session
.update_array(
&array_path,
shape3.clone(),
dimension_names.clone(),
array_def.clone(),
)
.await?;
session.commit("second commit").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repo.asset_manager(), 2).await;
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_splits_change_in_session(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let shape = ArrayShape::new(vec![(13, 13), (2, 2), (1, 1)]).unwrap();
let dimension_names = Some(vec!["t".into(), "y".into(), "x".into()]);
let new_dimension_names = Some(vec!["time".into(), "y".into(), "x".into()]);
let array_path: Path = "/temperature".try_into().unwrap();
let array_def = Bytes::from_static(br#"{"this":"other array"}"#);
let split_sizes = vec![
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName(
"^t$".to_string(),
),
num_chunks: 3,
}],
),
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName(
"time".to_string(),
),
num_chunks: 4,
}],
),
];
let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) };
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
let storage = Arc::clone(&logging);
let storage: Arc<dyn Storage + Send + Sync> = storage;
let repository = create_repo_with_split_manifest_config(
&array_path,
&shape,
&dimension_names,
&split_config,
Some(Arc::clone(&storage)),
spec_version,
)
.await?;
let verify_data = async |session: &Session, offset: u32| {
for idx in 0..12 {
let actual = get_chunk(
session
.get_chunk_reader(
&array_path,
&ChunkIndices(vec![idx, 0, 0]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap();
let expected =
Bytes::copy_from_slice(format!("{0}", idx + offset).as_bytes());
assert_eq!(actual, expected);
}
};
let mut session = repository.writable_session("main").await?;
for i in 0..12 {
session
.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![i, 0, 0]),
Some(ChunkPayload::Inline(format!("{i}").into())),
)
.await?;
}
verify_data(&session, 0).await;
session
.update_array(
&array_path,
shape.clone(),
new_dimension_names.clone(),
array_def.clone(),
)
.await?;
verify_data(&session, 0).await;
for i in 0..12 {
session
.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![i, 0, 0]),
Some(ChunkPayload::Inline(format!("{0}", i + 10).into())),
)
.await?;
}
verify_data(&session, 10).await;
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn tests_manifest_rewriting_simple(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let split_size = 3u32;
let dim_size = 10u32;
let shape = ArrayShape::new(vec![(dim_size as u64, dim_size)]).unwrap();
let dimension_names = Some(vec!["t".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let split_config = ManifestSplittingConfig::with_size(split_size);
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repository = create_repo_with_split_manifest_config(
&temp_path,
&shape,
&dimension_names,
&split_config,
Some(Arc::clone(&storage)),
spec_version,
)
.await?;
let mut total_manifests = 0;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let mut session = repository.writable_session("main").await?;
for i in 0..dim_size {
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![i]),
Some(ChunkPayload::Inline(format!("{i}").into())),
)
.await?;
}
session.commit("first split").max_concurrent_nodes(8).execute().await?;
total_manifests += 4;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let commit_method = match spec_version {
SpecVersionBin::V1 => CommitMethod::NewCommit,
SpecVersionBin::V2 => CommitMethod::Amend,
};
let validate_data = async || {
let new_repo = reopen_repo_with_new_splitting_config(&repository, None).await;
let session = new_repo
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await
.unwrap();
for i in 0..dim_size {
let val = get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(vec![i]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap();
assert_eq!(val, Bytes::copy_from_slice(format!("{i}").as_bytes()));
}
};
validate_data().await;
let split_sizes = vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: 12,
}],
)];
let new_repo =
reopen_repo_with_new_splitting_config(&repository, Some(split_sizes)).await;
let snap = rewrite_manifests(
&new_repo,
"main",
"rewrite_manifests with split-size=12",
8,
None,
commit_method,
)
.await?;
total_manifests += 1;
assert_manifest_count(new_repo.asset_manager(), total_manifests).await;
validate_data().await;
assert!(
repository
.lookup_snapshot(&snap)
.await?
.metadata
.get("__icechunk")
.and_then(|v| v.get("splitting_config"))
.is_some()
);
let split_sizes = vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: 4,
}],
)];
let new_repo =
reopen_repo_with_new_splitting_config(&repository, Some(split_sizes)).await;
let snap = rewrite_manifests(
&new_repo,
"main",
"rewrite_manifests with split-size=4",
8,
None,
commit_method,
)
.await?;
total_manifests += 3;
assert_manifest_count(new_repo.asset_manager(), total_manifests).await;
validate_data().await;
assert!(
repository
.lookup_snapshot(&snap)
.await?
.metadata
.get("__icechunk")
.and_then(|v| v.get("splitting_config"))
.is_some()
);
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn tests_manifest_splitting_simple(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let dim_size = 25u32;
let split_size = 3u32;
let shape =
ArrayShape::new(vec![(dim_size.into(), dim_size), (2, 2), (1, 1)]).unwrap();
let dimension_names = Some(vec!["t".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let split_config = ManifestSplittingConfig::with_size(split_size);
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
let storage = Arc::clone(&logging);
let storage: Arc<dyn Storage + Send + Sync> = storage;
let repository = create_repo_with_split_manifest_config(
&temp_path,
&shape,
&dimension_names,
&split_config,
Some(Arc::clone(&storage)),
spec_version,
)
.await?;
let mut total_manifests = 0;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
logging.clear();
let ops = logging.fetch_operations();
assert!(ops.is_empty());
let mut session = repository.writable_session("main").await?;
for i in 0..2 {
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![i, 0, 0]),
Some(ChunkPayload::Inline(format!("{i}").into())),
)
.await?;
}
session.commit("first split").max_concurrent_nodes(8).execute().await?;
total_manifests += 1;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let last_chunk = dim_size - 1;
let mut session = repository.writable_session("main").await?;
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![last_chunk, 0, 0]),
Some(ChunkPayload::Inline(format!("{last_chunk}").into())),
)
.await?;
session.commit("last split").max_concurrent_nodes(8).execute().await?;
total_manifests += 1;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let logging2 = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
let storage2 = Arc::clone(&logging2);
let storage2: Arc<dyn Storage + Send + Sync> = storage2;
let config = RepositoryConfig {
manifest: Some(ManifestConfig::empty()),
storage: Some(storage::Settings {
concurrency: Some(storage::ConcurrencySettings {
max_concurrent_requests_for_object: NonZeroU16::new(1),
..Default::default()
}),
..Default::default()
}),
..RepositoryConfig::default()
};
let read_repo = Repository::open(Some(config), storage2, HashMap::new()).await?;
let session = read_repo
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await?;
get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(vec![last_chunk, 0, 0]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap();
let ops = logging2.fetch_operations();
assert_eq!(ops.iter().filter(|(_, key)| key.starts_with("manifests")).count(), 1);
logging2.clear();
get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(vec![split_size + 1, 0, 0]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap();
let ops = logging2.fetch_operations();
assert_eq!(
ops.iter().filter(|(op, _)| op == "fetch_manifest_splitting").count(),
0
);
let mut session = repository.writable_session("main").await?;
for i in (0..dim_size).step_by(split_size as usize) {
total_manifests += 1;
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![i, 0, 0]),
Some(ChunkPayload::Inline(format!("{i}").into())),
)
.await?;
}
session.commit("wrote all splits").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let mut session = repository.writable_session("main").await?;
for i in 0..dim_size {
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![i, 0, 0]),
Some(ChunkPayload::Inline(format!("{i}").into())),
)
.await?;
}
total_manifests += dim_size.div_ceil(split_size) as usize;
session.commit("full overwrite").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
for i in 0..dim_size {
let val = get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(vec![i, 0, 0]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap();
assert_eq!(val, Bytes::copy_from_slice(format!("{i}").as_bytes()));
}
let mut session = repository.writable_session("main").await?;
for i in 0..dim_size {
session
.set_chunk_ref(temp_path.clone(), ChunkIndices(vec![i, 0, 0]), None)
.await?;
}
total_manifests += 0;
session.commit("clear existing array").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
let def = Bytes::from_static(br#"{"this":"array"}"#);
let array_path: Path = "/array2".to_string().try_into().unwrap();
let mut session = repository.writable_session("main").await?;
session
.add_array(
array_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session
.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![1, 0, 0]),
Some(ChunkPayload::Inline(format!("{0}", 10).into())),
)
.await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![1, 0, 0]), None)
.await?;
total_manifests += 0;
session.commit("clear new array").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
Ok(())
}
#[tokio_test]
async fn test_manifest_splitting_complex_config() -> Result<(), Box<dyn Error>> {
let shape = ArrayShape::new(vec![(25, 25), (10, 10), (3, 3), (4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let split_sizes = vec![
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: 12,
}],
),
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Axis(2),
num_chunks: 2,
}],
),
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: 9,
}],
),
];
let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) };
let expected = ManifestSplits::from_edges(vec![
vec![0, 12, 24, 25],
vec![0, 9, 10],
vec![0, 2, 3],
vec![0, 4],
]);
let actual = split_config.get_split_sizes(&temp_path, &shape, &dimension_names);
assert_eq!(actual, expected);
let split_sizes = vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![
ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: 12,
},
ManifestSplitDim {
condition: ManifestSplitDimCondition::Axis(2),
num_chunks: 2,
},
ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: 9,
},
],
)];
let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) };
let actual = split_config.get_split_sizes(&temp_path, &shape, &dimension_names);
assert_eq!(actual, expected);
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_manifest_splitting_complex_writes(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let t_split_size = 12u32;
let other_split_size = 9u32;
let y_split_size = 2u32;
let shape = ArrayShape::new(vec![(25, 25), (10, 10), (3, 3), (4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let split_sizes = vec![
(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: t_split_size,
}],
),
(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Axis(2),
num_chunks: y_split_size,
}],
),
(
ManifestSplitCondition::NameMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: other_split_size,
}],
),
];
let expected_split_sizes = [t_split_size, 9, y_split_size, 9];
let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) };
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
let logging_c = Arc::clone(&logging);
let logging_c: Arc<dyn Storage + Send + Sync> = logging_c;
let repository = create_repo_with_split_manifest_config(
&temp_path,
&shape,
&dimension_names,
&split_config,
Some(logging_c),
spec_version,
)
.await?;
let repo_clone = repository.reopen(None, None).await?;
let mut total_manifests = 0;
assert_manifest_count(repo_clone.asset_manager(), total_manifests).await;
logging.clear();
let ops = logging.fetch_operations();
assert!(ops.is_empty());
let array_shape =
shape.iter().map(|x| x.array_length() as u32).collect::<Vec<_>>();
let verify_data = async |ax, session: &Session| {
for i in 0..shape.get(ax).unwrap().array_length() {
let mut index = vec![0u32, 0, 0, 0];
index[ax] = i as u32;
let ic = index.clone();
let val = get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(index),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap_or_else(|| panic!("getting chunk ref failed for {:?}", &ic));
let expected_value =
ravel_multi_index(ic.as_slice(), array_shape.as_slice());
let expected =
Bytes::copy_from_slice(format!("{expected_value}").as_bytes());
assert_eq!(
val, expected,
"For chunk {ic:?}, received {val:?}, expected {expected:?}"
);
}
};
let verify_all_data = async |repo: &Repository| {
let session = repo
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await
.unwrap();
for ax in 0..shape.len() {
verify_data(ax, &session).await;
}
};
for ax in 0..shape.len() {
let mut session = repository.writable_session("main").await?;
let axis_size = shape.get(ax).unwrap().array_length();
for i in 0..axis_size {
let mut index = vec![0u32, 0, 0, 0];
index[ax] = i as u32;
let value = ravel_multi_index(index.as_slice(), array_shape.as_slice());
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(index),
Some(ChunkPayload::Inline(format!("{value}").into())),
)
.await?;
}
total_manifests +=
(axis_size as u32).div_ceil(expected_split_sizes[ax]) as usize;
session
.commit(format!("finished axis {ax}"))
.max_concurrent_nodes(8)
.execute()
.await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
verify_data(ax, &session).await;
}
verify_all_data(&repository).await;
let split_sizes = vec![(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: t_split_size,
}],
)];
let repository =
reopen_repo_with_new_splitting_config(&repository, Some(split_sizes)).await;
verify_all_data(&repository).await;
let mut session = repository.writable_session("main").await?;
let index = vec![13, 0, 0, 0];
let value = ravel_multi_index(index.as_slice(), array_shape.as_slice());
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(index),
Some(ChunkPayload::Inline(format!("{value}").into())),
)
.await?;
total_manifests += 1;
session.commit("finished time again").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
verify_all_data(&repository).await;
let mut session = repository.writable_session("main").await?;
for idx in [0, 12, 24] {
let index = vec![idx, 0, 0, 0];
let value = ravel_multi_index(index.as_slice(), array_shape.as_slice());
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(index),
Some(ChunkPayload::Inline(format!("{value}").into())),
)
.await?;
}
total_manifests +=
(shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize;
session.commit("finished time again").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
verify_all_data(&repository).await;
let mut session = repo_clone.writable_session("main").await?;
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![0, 0, 0, 0]),
Some(ChunkPayload::Inline(format!("{0}", 0).into())),
)
.await?;
total_manifests += 3;
session.commit("finished time again").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repository.asset_manager(), total_manifests).await;
verify_all_data(&repo_clone).await;
verify_all_data(&repository).await;
let mut session = repo_clone.writable_session("main").await?;
for idx in [0, 12, 24] {
let index = vec![idx, 0, 0, 0];
let value = ravel_multi_index(index.as_slice(), array_shape.as_slice());
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(index),
Some(ChunkPayload::Inline(format!("{value}").into())),
)
.await?;
}
total_manifests +=
(shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize;
session.commit("finished time again").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repo_clone.asset_manager(), total_manifests).await;
verify_all_data(&repo_clone).await;
let mut session = repo_clone.writable_session("main").await?;
for idx in [0, 12, 24] {
let index = vec![idx, 0, 0, 0];
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(index),
Some(ChunkPayload::Inline(format!("{0}", idx + 2).into())),
)
.await?;
}
total_manifests +=
(shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize;
session.commit("finished time again").max_concurrent_nodes(8).execute().await?;
assert_manifest_count(repo_clone.asset_manager(), total_manifests).await;
for idx in [0, 12, 24] {
let actual = get_chunk(
session
.get_chunk_reader(
&temp_path,
&ChunkIndices(vec![idx, 0, 0, 0]),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap();
let expected = Bytes::copy_from_slice(format!("{0}", idx + 2).as_bytes());
assert_eq!(actual, expected);
}
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_manifest_splits_merge_sessions(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let shape = ArrayShape::new(vec![(25, 25), (10, 10), (3, 3), (4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let orig_split_sizes = vec![(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: 12u32,
}],
)];
let split_config =
ManifestSplittingConfig { split_sizes: Some(orig_split_sizes.clone()) };
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repository = create_repo_with_split_manifest_config(
&temp_path,
&shape,
&dimension_names,
&split_config,
Some(backend),
spec_version,
)
.await?;
let indices =
[vec![0, 0, 1, 0], vec![0, 0, 0, 0], vec![0, 2, 0, 0], vec![0, 2, 0, 1]];
let mut session1 = repository.writable_session("main").await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[0].clone()),
Some(ChunkPayload::Inline(format!("{0}", 0).into())),
)
.await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[1].clone()),
Some(ChunkPayload::Inline(format!("{0}", 1).into())),
)
.await?;
for incompatible_size in [1, 11u32, 24u32, u32::MAX] {
let incompatible_split_sizes = vec![(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: incompatible_size,
}],
)];
let other_repo = reopen_repo_with_new_splitting_config(
&repository,
Some(incompatible_split_sizes),
)
.await;
assert_ne!(other_repo.config(), repository.config());
let mut session2 = other_repo.writable_session("main").await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[2].clone()),
Some(ChunkPayload::Inline(format!("{0}", 2).into())),
)
.await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[3].clone()),
Some(ChunkPayload::Inline(format!("{0}", 3).into())),
)
.await?;
assert!(session1.merge(session2).await.is_ok());
}
let other_repo =
reopen_repo_with_new_splitting_config(&repository, Some(orig_split_sizes))
.await;
let mut session2 = other_repo.writable_session("main").await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[2].clone()),
Some(ChunkPayload::Inline(format!("{0}", 2).into())),
)
.await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[3].clone()),
Some(ChunkPayload::Inline(format!("{0}", 3).into())),
)
.await?;
for (val, idx) in enumerate(indices.iter()) {
let actual = get_chunk(
session1
.get_chunk_reader(
&temp_path,
&ChunkIndices(idx.clone()),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap_or_else(|| panic!("getting chunk ref failed for {:?}", &idx));
let expected = Bytes::copy_from_slice(format!("{val}").as_bytes());
assert_eq!(actual, expected);
}
let mut session1 = repository.writable_session("main").await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[0].clone()),
Some(ChunkPayload::Inline(format!("{0}", 3).into())),
)
.await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[1].clone()),
Some(ChunkPayload::Inline(format!("{0}", 4).into())),
)
.await?;
let mut session2 = repository.writable_session("main").await?;
session2
.set_chunk_ref(temp_path.clone(), ChunkIndices(indices[2].clone()), None)
.await?;
session2
.set_chunk_ref(temp_path.clone(), ChunkIndices(indices[3].clone()), None)
.await?;
session1.merge(session2).await?;
let expected = [Some(3), Some(4), None, None];
for (expect, idx) in zip(expected.iter(), indices.iter()) {
let actual = get_chunk(
session1
.get_chunk_reader(
&temp_path,
&ChunkIndices(idx.clone()),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap();
let expected_value =
expect.map(|val| Bytes::copy_from_slice(format!("{val}").as_bytes()));
assert_eq!(actual, expected_value);
}
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_commits_with_conflicting_manifest_splits(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let shape = ArrayShape::new(vec![(25, 25), (10, 10), (3, 3), (4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]);
let temp_path: Path = "/temperature".try_into().unwrap();
let orig_split_sizes = vec![(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: 12u32,
}],
)];
let split_config =
ManifestSplittingConfig { split_sizes: Some(orig_split_sizes.clone()) };
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repository = create_repo_with_split_manifest_config(
&temp_path,
&shape,
&dimension_names,
&split_config,
Some(backend),
spec_version,
)
.await?;
let indices =
[vec![0, 0, 1, 0], vec![0, 0, 0, 0], vec![0, 2, 0, 0], vec![0, 2, 0, 1]];
let mut session1 = repository.writable_session("main").await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[0].clone()),
Some(ChunkPayload::Inline(format!("{0}", 0).into())),
)
.await?;
session1
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[1].clone()),
Some(ChunkPayload::Inline(format!("{0}", 1).into())),
)
.await?;
let incompatible_size = 11u32;
let incompatible_split_sizes = vec![(
ManifestSplitCondition::AnyArray,
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::DimensionName("t".to_string()),
num_chunks: incompatible_size,
}],
)];
let other_repo = reopen_repo_with_new_splitting_config(
&repository,
Some(incompatible_split_sizes),
)
.await;
assert_ne!(other_repo.config(), repository.config());
let mut session2 = other_repo.writable_session("main").await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[2].clone()),
Some(ChunkPayload::Inline(format!("{0}", 2).into())),
)
.await?;
session2
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(indices[3].clone()),
Some(ChunkPayload::Inline(format!("{0}", 3).into())),
)
.await?;
session1.commit("first commit").max_concurrent_nodes(8).execute().await?;
if let Err(SessionError { kind: SessionErrorKind::Conflict { .. }, .. }) =
session2.commit("second commit").max_concurrent_nodes(8).execute().await
{
let solver = BasicConflictSolver::default();
assert!(session2.rebase(&solver).await.is_ok());
session2
.commit("second commit after rebase")
.max_concurrent_nodes(8)
.execute()
.await?;
} else {
panic!("this should have conflicted!");
}
let new_session = repository
.readonly_session(&VersionInfo::BranchTipRef("main".into()))
.await?;
for (val, idx) in enumerate(indices.iter()) {
let actual = get_chunk(
new_session
.get_chunk_reader(
&temp_path,
&ChunkIndices(idx.clone()),
&ByteRange::ALL,
)
.await
.unwrap(),
)
.await
.unwrap()
.unwrap_or_else(|| panic!("getting chunk ref failed for {:?}", &idx));
let expected = Bytes::copy_from_slice(format!("{val}").as_bytes());
assert_eq!(actual, expected);
}
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_manifest_preload_known_manifests(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn Error>> {
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let storage = Arc::clone(&backend);
let repository =
Repository::create(None, storage, HashMap::new(), Some(spec_version), true)
.await?;
let mut session = repository.writable_session("main").await?;
let def = Bytes::from_static(br#"{"this":"array"}"#);
session.add_group(Path::root(), def.clone()).await?;
let shape = ArrayShape::new(vec![(1_000, 1_000), (1, 1), (1, 1)]).unwrap();
let dimension_names = Some(vec!["t".into()]);
let time_path: Path = "/time".try_into().unwrap();
let temp_path: Path = "/temperature".try_into().unwrap();
let lat_path: Path = "/latitude".try_into().unwrap();
let lon_path: Path = "/longitude".try_into().unwrap();
session
.add_array(
time_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session
.add_array(
temp_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session
.add_array(
lat_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session
.add_array(
lon_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session
.set_chunk_ref(
time_path.clone(),
ChunkIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session
.set_chunk_ref(
time_path.clone(),
ChunkIndices(vec![1, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session
.set_chunk_ref(
time_path.clone(),
ChunkIndices(vec![2, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session
.set_chunk_ref(
lat_path.clone(),
ChunkIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session
.set_chunk_ref(
lon_path.clone(),
ChunkIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session
.set_chunk_ref(
temp_path.clone(),
ChunkIndices(vec![0, 0, 0]),
Some(ChunkPayload::Inline("hello".into())),
)
.await?;
session.commit("create arrays").max_concurrent_nodes(8).execute().await?;
let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
let logging_c = Arc::clone(&logging);
let logging_c: Arc<dyn Storage + Send + Sync> = logging_c;
let storage = Arc::clone(&logging_c);
let man_config = ManifestConfig {
preload: Some(ManifestPreloadConfig {
max_total_refs: Some(2),
preload_if: None,
max_arrays_to_scan: None,
}),
..ManifestConfig::default()
};
let config = RepositoryConfig {
manifest: Some(man_config),
storage: Some(storage::Settings {
concurrency: Some(storage::ConcurrencySettings {
max_concurrent_requests_for_object: NonZeroU16::new(1),
..Default::default()
}),
..Default::default()
}),
..RepositoryConfig::default()
};
let repository = Repository::open(Some(config), storage, HashMap::new()).await?;
let ops =
Vec::from_iter(logging.fetch_operations().into_iter().filter(|(_, key)| {
key != "repo" && key != "config.yaml" && !key.contains("ref.json")
}));
assert!(ops.is_empty());
let session = repository
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
.await?;
let mut retries = 0;
while retries < 50
&& !logging
.fetch_operations()
.iter()
.any(|(_, key)| key.starts_with("manifests"))
{
tokio::time::sleep(std::time::Duration::from_secs_f32(0.1)).await;
retries += 1;
}
let ops =
Vec::from_iter(logging.fetch_operations().into_iter().filter(|(_, key)| {
key.starts_with("manifests") || key.starts_with("snapshots")
}));
let lat_manifest_id = match session.get_node(&lat_path).await?.node_data {
NodeData::Array { manifests, .. } => manifests[0].object_id.to_string(),
NodeData::Group => panic!(),
};
let lon_manifest_id = match session.get_node(&lon_path).await?.node_data {
NodeData::Array { manifests, .. } => manifests[0].object_id.to_string(),
NodeData::Group => panic!(),
};
assert!(ops[0].1.starts_with("snapshots"));
assert_eq!(
ops[1],
(
"get_object_range".to_string(),
format!("{MANIFESTS_FILE_PATH}/{lat_manifest_id}")
)
);
assert_eq!(
ops[2],
(
"get_object_range".to_string(),
format!("{MANIFESTS_FILE_PATH}/{lon_manifest_id}")
)
);
Ok(())
}
#[cfg(feature = "object-store-fs")]
#[tokio::test]
async fn creation_in_non_empty_directory_fails() -> Result<(), Box<dyn Error>> {
use crate::new_local_filesystem_storage;
let repo_dir = TempDir::new()?;
let storage: Arc<dyn Storage + Send + Sync> =
new_local_filesystem_storage(repo_dir.path())
.await
.expect("Creating local storage failed");
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
assert!(
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await
.is_err()
);
let inner_path: PathBuf =
[repo_dir.path().to_string_lossy().into_owned().as_str(), "snapshots"]
.iter()
.collect();
let storage: Arc<dyn Storage + Send + Sync> =
new_local_filesystem_storage(&inner_path)
.await
.expect("Creating local storage failed");
assert!(
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await
.is_err()
);
Ok(())
}
#[tokio::test]
async fn create_repo_with_spec_version_2() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(SpecVersionBin::V2),
true,
)
.await?;
let repo = Repository::open(None, storage, Default::default()).await?;
assert_eq!(repo.spec_version(), SpecVersionBin::V2);
Ok(())
}
#[tokio::test]
async fn create_repo_with_spec_version_1() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(SpecVersionBin::V1),
true,
)
.await?;
let repo = Repository::open(None, storage, Default::default()).await?;
assert_eq!(repo.spec_version(), SpecVersionBin::V1);
Ok(())
}
#[tokio::test]
async fn set_metadata() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo =
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
assert_eq!(repo.get_metadata().await?, Default::default());
let meta =
[("foo".to_string(), "bar".into()), ("number".to_string(), 42.into())].into();
repo.set_metadata(&meta).await?;
assert_eq!(repo.get_metadata().await?, meta);
Ok(())
}
#[tokio::test]
async fn update_metadata() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo =
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
let meta =
[("foo".to_string(), "bar".into()), ("number".to_string(), 42.into())].into();
let res = repo.update_metadata(&meta).await?;
assert_eq!(res, meta);
assert_eq!(repo.get_metadata().await?, meta);
let res =
repo.update_metadata(&[("number".to_string(), 43.into())].into()).await?;
let expected =
[("foo".to_string(), "bar".into()), ("number".to_string(), 43.into())].into();
assert_eq!(res, expected);
assert_eq!(repo.get_metadata().await?, expected,);
let res = repo
.update_metadata(&[("foo".to_string(), None::<String>.into())].into())
.await?;
let expected = [
("foo".to_string(), None::<String>.into()),
("number".to_string(), 43.into()),
]
.into();
assert_eq!(res, expected);
assert_eq!(repo.get_metadata().await?, expected,);
Ok(())
}
#[tokio::test]
async fn test_ops_log_chain_with_changing_num_updates_per_file()
-> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let config = RepositoryConfig {
num_updates_per_repo_info_file: Some(1),
..Default::default()
};
let repo = Repository::create(
Some(config.clone()),
Arc::clone(&storage),
HashMap::new(),
None,
true,
)
.await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 1);
assert!(matches!(ops[0].1, UpdateType::RepoInitializedUpdate));
let snap_id = repo.lookup_branch("main").await?;
repo.create_branch("test-branch", &snap_id).await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 2);
repo.create_tag("test-tag", &snap_id).await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 3);
assert!(matches!(ops[0].1, UpdateType::TagCreatedUpdate { .. }));
assert!(matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate));
let config2 = RepositoryConfig {
num_updates_per_repo_info_file: Some(10),
..Default::default()
};
let repo =
Repository::open(Some(config2), Arc::clone(&storage), HashMap::new()).await?;
repo.create_tag("test-tag-1", &snap_id).await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 4);
assert!(matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate));
let config2 = RepositoryConfig {
num_updates_per_repo_info_file: Some(1),
..Default::default()
};
let repo =
Repository::open(Some(config2), Arc::clone(&storage), HashMap::new()).await?;
repo.create_tag("test-tag-2", &snap_id).await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 5);
assert!(matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate));
repo.create_tag("test-tag-3", &snap_id).await?;
let (stream, _, _) = repo.ops_log().await?;
let ops: Vec<_> = stream.try_collect().await?;
assert_eq!(ops.len(), 6);
assert!(matches!(ops.last().unwrap().1, UpdateType::RepoInitializedUpdate));
Ok(())
}
#[tokio_test]
async fn test_virtual_chunk_location_compression_after_migration()
-> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let config = RepositoryConfig {
manifest: Some(ManifestConfig {
virtual_chunk_location_compression: Some(
ManifestVirtualChunkLocationCompressionConfig {
min_num_chunks: Some(1),
..Default::default()
},
),
..ManifestConfig::empty()
}),
inline_chunk_threshold_bytes: Some(0),
..Default::default()
};
let repo = Repository::create(
Some(config.clone()),
Arc::clone(&storage),
HashMap::new(),
Some(SpecVersionBin::V1),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?;
let array_path: Path = "/array".to_string().try_into().unwrap();
let shape = ArrayShape::new(vec![(10, 10)]).unwrap();
session
.add_array(
array_path.clone(),
shape,
Some(vec!["x".into()]),
Bytes::from_static(b"{}"),
)
.await?;
for idx in 0..5u32 {
let location =
VirtualChunkLocation::from_url(&format!("s3://bucket/file_{idx}.dat"))
.unwrap();
session
.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![idx]),
Some(ChunkPayload::Virtual(VirtualChunkRef {
location,
offset: idx as u64 * 1000,
length: 1000,
checksum: None,
})),
)
.await?;
}
session.commit("add virtual chunks").max_concurrent_nodes(8).execute().await?;
let snap_id =
repo.resolve_version(&VersionInfo::BranchTipRef("main".to_string())).await?;
let snapshot = repo.asset_manager().fetch_snapshot(&snap_id).await?;
for mf in snapshot.manifest_files() {
let mf = mf?;
let manifest =
repo.asset_manager().fetch_manifest_unknown_size(&mf.id).await?;
assert!(
!manifest.uses_location_compression(),
"V1 manifests should not use location compression"
);
assert_eq!(
manifest.num_compressed_refs(),
0,
"V1 manifests should have no compressed refs"
);
}
migrate_1_to_2(repo, false, true, None).await.unwrap();
let repo =
Repository::open(Some(config), Arc::clone(&storage), HashMap::new()).await?;
assert_eq!(repo.spec_version(), SpecVersionBin::V2);
rewrite_manifests(
&repo,
"main",
"rewrite manifests",
8,
None,
CommitMethod::NewCommit,
)
.await
.unwrap();
let snap_id =
repo.resolve_version(&VersionInfo::BranchTipRef("main".to_string())).await?;
let snapshot = repo.asset_manager().fetch_snapshot(&snap_id).await?;
for mf in snapshot.manifest_files() {
let mf = mf?;
let manifest =
repo.asset_manager().fetch_manifest_unknown_size(&mf.id).await?;
assert!(
manifest.uses_location_compression(),
"IC2 manifests should use location compression after rewrite"
);
assert!(
manifest.num_compressed_refs() > 0,
"IC2 manifests should have compressed refs after rewrite"
);
}
Ok(())
}
#[tokio_test]
async fn test_rewrite_manifests_after_rearrange() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let repo = Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(SpecVersionBin::current()),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?;
session
.add_group("/source".try_into().unwrap(), Bytes::copy_from_slice(b""))
.await?;
session.commit("init").max_concurrent_nodes(8).execute().await?;
let mut session = repo.rearrange_session("main").await?;
session
.move_node("/source".try_into().unwrap(), "/dest".try_into().unwrap())
.await?;
session.commit("moved source to dest").max_concurrent_nodes(8).execute().await?;
let repo2 = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
let result = rewrite_manifests(
&repo2,
"main",
"rewriting manifests",
8,
None,
CommitMethod::Amend,
)
.await;
assert!(result.is_err());
let result = rewrite_manifests(
&repo2,
"main",
"rewriting manifests",
8,
None,
CommitMethod::NewCommit,
)
.await;
assert!(result.is_ok());
Ok(())
}
#[cfg(feature = "object-store-fs")]
#[tokio_test]
async fn test_concurrent_distributer_writers_with_base_state()
-> Result<(), Box<dyn Error>> {
use crate::new_local_filesystem_storage;
let repo_dir = TempDir::new()?;
let storage: Arc<dyn Storage + Send + Sync> =
new_local_filesystem_storage(repo_dir.path()).await?;
let repo = Repository::create(
None,
Arc::clone(&storage),
HashMap::new(),
Some(SpecVersionBin::current()),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?;
let array_path: Path = "/array".to_string().try_into().unwrap();
let shape = ArrayShape::new(vec![(10, 10)]).unwrap();
session
.add_array(
array_path.clone(),
shape.clone(),
Some(vec!["x".into()]),
Bytes::from_static(b"{}"),
)
.await?;
session.commit("init").execute().await?;
async fn do_distributed_writes(
session: &mut Session,
array_path: &Path,
) -> Result<(), Box<dyn Error>> {
let clean = session.fork().await?;
let clean_bytes = clean.as_bytes()?;
let mut s1 = Session::from_bytes(&clean_bytes)?;
let mut s2 = Session::from_bytes(&clean_bytes)?;
s1.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![0]),
Some(ChunkPayload::Inline("0".to_string().into())),
)
.await?;
s2.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![1]),
Some(ChunkPayload::Inline("1".to_string().into())),
)
.await?;
assert!(s1.commit("foo").execute().await.is_err());
assert!(s2.commit("foo").execute().await.is_err());
s1.merge(s2).await?;
assert!(s1.commit("foo").execute().await.is_err());
let base_clone = session.clone();
assert!(matches!(
s1.merge(base_clone).await.unwrap_err().kind,
SessionErrorKind::MergeNotAllowed
));
let flush_result =
s1.clone().commit("fork-flush").anonymous().execute().await;
assert!(flush_result.is_ok());
session.merge(s1).await?;
for i in 0..2 {
let reader = session
.get_chunk_reader(array_path, &ChunkIndices(vec![i]), &ByteRange::ALL)
.await?;
assert_eq!(get_chunk(reader).await?, Some(format!("{i}").into()));
}
session.commit("foo").execute().await?;
for i in 0..2 {
let reader = session
.get_chunk_reader(array_path, &ChunkIndices(vec![i]), &ByteRange::ALL)
.await?;
assert_eq!(get_chunk(reader).await?, Some(format!("{i}").into()));
}
Ok(())
}
let mut session = repo.writable_session("main").await?;
session.delete_array(array_path.clone()).await?;
session
.add_array(
array_path.clone(),
shape.clone(),
Some(vec!["x".into()]),
Bytes::from_static(b"{}"),
)
.await?;
do_distributed_writes(&mut session, &array_path).await?;
let mut session = repo.writable_session("main").await?;
for i in 0..10 {
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![i]), None)
.await?;
}
do_distributed_writes(&mut session, &array_path).await?;
Ok(())
}
}